YARN-5864. Capacity Scheduler - Queue Priorities. (wangda)

This commit is contained in:
Wangda Tan 2017-01-24 14:44:42 -08:00
parent 1672a06135
commit 1309accd68
37 changed files with 2764 additions and 231 deletions

View File

@ -166,10 +166,6 @@
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.RecoveryComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PartitionedQueueComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<!-- Ignore some irrelevant class name warning -->
<Match>
<Class name="org.apache.hadoop.yarn.api.records.SerializedException" />

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -49,13 +50,11 @@ static class TQComparator implements Comparator<TempQueuePerPartition> {
@Override
public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
return -1;
}
if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
return 1;
}
return 0;
double assigned1 = getIdealPctOfGuaranteed(tq1);
double assigned2 = getIdealPctOfGuaranteed(tq2);
return PriorityUtilizationQueueOrderingPolicy.compare(assigned1,
assigned2, tq1.relativePriority, tq2.relativePriority);
}
// Calculates idealAssigned / guaranteed
@ -156,6 +155,7 @@ protected void computeFixpointAllocation(Resource totGuarant,
// way, the most underserved queue(s) are always given resources first.
Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
orderedByNeed, tqComparator);
for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
.hasNext();) {
TempQueuePerPartition sub = i.next();

View File

@ -48,7 +48,8 @@ public abstract class PreemptionCandidatesSelector {
* @param selectedCandidates already selected candidates from previous policies
* @param clusterResource total resource
* @param totalPreemptedResourceAllowed how many resources allowed to be
* preempted in this round
* preempted in this round. Should be
* updated(in-place set) after the call
* @return merged selected candidates.
*/
public abstract Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(

View File

@ -19,6 +19,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -193,6 +194,14 @@ public void init(Configuration config, RMContext context,
rc = scheduler.getResourceCalculator();
nlm = scheduler.getRMContext().getNodeLabelManager();
// Do we need white queue-priority preemption policy?
boolean isQueuePriorityPreemptionEnabled =
csConfig.getPUOrderingPolicyUnderUtilizedPreemptionEnabled();
if (isQueuePriorityPreemptionEnabled) {
candidatesSelectionPolicies.add(
new QueuePriorityContainerCandidateSelector(this));
}
// Do we need to specially consider reserved containers?
boolean selectCandidatesForResevedContainers = csConfig.getBoolean(
CapacitySchedulerConfiguration.
@ -352,6 +361,8 @@ private void containerBasedPreemptOrKill(CSQueue root,
.clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)),
partitionToLookAt);
}
// Update effective priority of queues
}
this.leafQueueNames = ImmutableSet.copyOf(getLeafQueueNames(
@ -368,13 +379,28 @@ private void containerBasedPreemptOrKill(CSQueue root,
new HashMap<>();
for (PreemptionCandidatesSelector selector :
candidatesSelectionPolicies) {
long startTime = 0;
if (LOG.isDebugEnabled()) {
LOG.debug(MessageFormat
.format("Trying to use {0} to select preemption candidates",
selector.getClass().getName()));
startTime = clock.getTime();
}
toPreempt = selector.selectCandidates(toPreempt,
clusterResources, totalPreemptionAllowed);
if (LOG.isDebugEnabled()) {
LOG.debug(MessageFormat
.format("{0} uses {1} millisecond to run",
selector.getClass().getName(), clock.getTime() - startTime));
int totalSelected = 0;
for (Set<RMContainer> set : toPreempt.values()) {
totalSelected += set.size();
}
LOG.debug(MessageFormat
.format("So far, total {0} containers selected to be preempted",
totalSelected));
}
}
if (LOG.isDebugEnabled()) {
@ -470,11 +496,22 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue,
reserved, curQueue);
if (curQueue instanceof ParentQueue) {
String configuredOrderingPolicy =
((ParentQueue) curQueue).getQueueOrderingPolicy().getConfigName();
// Recursively add children
for (CSQueue c : curQueue.getChildQueues()) {
TempQueuePerPartition subq = cloneQueues(c, partitionResource,
partitionToLookAt);
// If we respect priority
if (StringUtils.equals(
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
configuredOrderingPolicy)) {
subq.relativePriority = c.getPriority().getPriority();
}
ret.addChild(subq);
subq.parent = ret;
}
}
} finally {

View File

@ -0,0 +1,510 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class QueuePriorityContainerCandidateSelector
extends PreemptionCandidatesSelector {
private static final Log LOG =
LogFactory.getLog(QueuePriorityContainerCandidateSelector.class);
// Configured timeout before doing reserved container preemption
private long minTimeout;
// Allow move reservation around for better placement?
private boolean allowMoveReservation;
// All the reserved containers of the system which could possible preempt from
// queue with lower priorities
private List<RMContainer> reservedContainers;
// From -> To
// A digraph to represent if one queue has higher priority than another.
// For example, a->b means queue=a has higher priority than queue=b
private Table<String, String, Boolean> priorityDigraph =
HashBasedTable.create();
private Resource clusterResource;
private Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates;
private Resource totalPreemptionAllowed;
// A cached scheduler node map, will be refreshed each round.
private Map<NodeId, TempSchedulerNode> tempSchedulerNodeMap = new HashMap<>();
// Have we touched (make any changes to the node) for this round
// Once a node is touched, we will not try to move reservations to the node
private Set<NodeId> touchedNodes;
// Resource which marked to preempt from other queues.
// <Queue, Partition, Resource-marked-to-be-preempted-from-other-queue>
private Table<String, String, Resource> toPreemptedFromOtherQueues =
HashBasedTable.create();
private final Comparator<RMContainer>
CONTAINER_CREATION_TIME_COMPARATOR = new Comparator<RMContainer>() {
@Override
public int compare(RMContainer o1, RMContainer o2) {
if (preemptionAllowed(o1.getQueueName(), o2.getQueueName())) {
return -1;
} else if (preemptionAllowed(o2.getQueueName(), o1.getQueueName())) {
return 1;
}
// If two queues cannot preempt each other, compare creation time.
return Long.compare(o1.getCreationTime(), o2.getCreationTime());
}
};
QueuePriorityContainerCandidateSelector(
CapacitySchedulerPreemptionContext preemptionContext) {
super(preemptionContext);
// Initialize parameters
CapacitySchedulerConfiguration csc =
preemptionContext.getScheduler().getConfiguration();
minTimeout = csc.getPUOrderingPolicyUnderUtilizedPreemptionDelay();
allowMoveReservation =
csc.getPUOrderingPolicyUnderUtilizedPreemptionMoveReservation();
}
private List<TempQueuePerPartition> getPathToRoot(TempQueuePerPartition tq) {
List<TempQueuePerPartition> list = new ArrayList<>();
while (tq != null) {
list.add(tq);
tq = tq.parent;
}
return list;
}
private void intializePriorityDigraph() {
LOG.info("Initializing priority preemption directed graph:");
// Make sure we iterate all leaf queue combinations
for (String q1 : preemptionContext.getLeafQueueNames()) {
for (String q2 : preemptionContext.getLeafQueueNames()) {
// Make sure we only calculate each combination once instead of all
// permutations
if (q1.compareTo(q2) < 0) {
TempQueuePerPartition tq1 = preemptionContext.getQueueByPartition(q1,
RMNodeLabelsManager.NO_LABEL);
TempQueuePerPartition tq2 = preemptionContext.getQueueByPartition(q2,
RMNodeLabelsManager.NO_LABEL);
List<TempQueuePerPartition> path1 = getPathToRoot(tq1);
List<TempQueuePerPartition> path2 = getPathToRoot(tq2);
// Get direct ancestor below LCA (Lowest common ancestor)
int i = path1.size() - 1;
int j = path2.size() - 1;
while (path1.get(i).queueName.equals(path2.get(j).queueName)) {
i--;
j--;
}
// compare priority of path1[i] and path2[j]
int p1 = path1.get(i).relativePriority;
int p2 = path2.get(j).relativePriority;
if (p1 < p2) {
priorityDigraph.put(q2, q1, true);
if (LOG.isDebugEnabled()) {
LOG.info("- Added priority ordering edge: " + q2 + " >> " + q1);
}
} else if (p2 < p1) {
priorityDigraph.put(q1, q2, true);
if (LOG.isDebugEnabled()) {
LOG.info("- Added priority ordering edge: " + q1 + " >> " + q2);
}
}
}
}
}
}
/**
* Do we allow demandingQueue preempt resource from toBePreemptedQueue
*
* @param demandingQueue demandingQueue
* @param toBePreemptedQueue toBePreemptedQueue
* @return can/cannot
*/
private boolean preemptionAllowed(String demandingQueue,
String toBePreemptedQueue) {
return priorityDigraph.contains(demandingQueue,
toBePreemptedQueue);
}
/**
* Can we preempt enough resource for given:
*
* @param requiredResource askedResource
* @param demandingQueue demandingQueue
* @param schedulerNode node
* @param lookingForNewReservationPlacement Are we trying to look for move
* reservation to the node
* @param newlySelectedContainers newly selected containers, will be set when
* we can preempt enough resources from the node.
*
* @return can/cannot
*/
private boolean canPreemptEnoughResourceForAsked(Resource requiredResource,
String demandingQueue, FiCaSchedulerNode schedulerNode,
boolean lookingForNewReservationPlacement,
List<RMContainer> newlySelectedContainers) {
// Do not check touched nodes again.
if (touchedNodes.contains(schedulerNode.getNodeID())) {
return false;
}
TempSchedulerNode node = tempSchedulerNodeMap.get(schedulerNode.getNodeID());
if (null == node) {
node = TempSchedulerNode.fromSchedulerNode(schedulerNode);
tempSchedulerNodeMap.put(schedulerNode.getNodeID(), node);
}
if (null != schedulerNode.getReservedContainer()
&& lookingForNewReservationPlacement) {
// Node reserved by the others, skip this node
// We will not try to move the reservation to node which reserved already.
return false;
}
// Need to preemption = asked - (node.total - node.allocated)
Resource lacking = Resources.subtract(requiredResource, Resources
.subtract(node.getTotalResource(), node.getAllocatedResource()));
// On each host, simply check if we could preempt containers from
// lower-prioritized queues or not
List<RMContainer> runningContainers = node.getRunningContainers();
Collections.sort(runningContainers, CONTAINER_CREATION_TIME_COMPARATOR);
// First of all, consider already selected containers
for (RMContainer runningContainer : runningContainers) {
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(
runningContainer, selectedCandidates)) {
Resources.subtractFrom(lacking,
runningContainer.getAllocatedResource());
}
}
// If we already can allocate the reserved container after preemption,
// skip following steps
if (Resources.fitsIn(rc, clusterResource, lacking,
Resources.none())) {
return true;
}
Resource allowed = Resources.clone(totalPreemptionAllowed);
Resource selected = Resources.createResource(0);
for (RMContainer runningContainer : runningContainers) {
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(
runningContainer, selectedCandidates)) {
// ignore selected containers
continue;
}
// Only preempt resource from queue with lower priority
if (!preemptionAllowed(demandingQueue,
runningContainer.getQueueName())) {
continue;
}
// Don't preempt AM container
if (runningContainer.isAMContainer()) {
continue;
}
// Not allow to preempt more than limit
if (Resources.greaterThanOrEqual(rc, clusterResource, allowed,
runningContainer.getAllocatedResource())) {
Resources.subtractFrom(allowed,
runningContainer.getAllocatedResource());
Resources.subtractFrom(lacking,
runningContainer.getAllocatedResource());
Resources.addTo(selected, runningContainer.getAllocatedResource());
if (null != newlySelectedContainers) {
newlySelectedContainers.add(runningContainer);
}
}
// Lacking <= 0 means we can allocate the reserved container
if (Resources.fitsIn(rc, clusterResource, lacking, Resources.none())) {
return true;
}
}
return false;
}
private boolean preChecksForMovingReservedContainerToNode(
RMContainer reservedContainer, FiCaSchedulerNode newNode) {
// Don't do this if it has hard-locality preferences
if (reservedContainer.hasIncreaseReservation()) {
// This means a container update request (like increase / promote)
return false;
}
// For normal requests
FiCaSchedulerApp app =
preemptionContext.getScheduler().getApplicationAttempt(
reservedContainer.getApplicationAttemptId());
ResourceRequest offswithRequest = app.getAppSchedulingInfo().getResourceRequest(
reservedContainer.getReservedSchedulerKey(), ResourceRequest.ANY);
if (!offswithRequest.getRelaxLocality()) {
// This is a hard locality request
return false;
}
// Check if newNode's partition matches requested partition
if (!StringUtils.equals(reservedContainer.getNodeLabelExpression(),
newNode.getPartition())) {
return false;
}
return true;
}
private void tryToMakeBetterReservationPlacement(
RMContainer reservedContainer,
List<FiCaSchedulerNode> allSchedulerNodes) {
for (FiCaSchedulerNode targetNode : allSchedulerNodes) {
// Precheck if we can move the rmContainer to the new targetNode
if (!preChecksForMovingReservedContainerToNode(reservedContainer,
targetNode)) {
continue;
}
if (canPreemptEnoughResourceForAsked(
reservedContainer.getReservedResource(),
reservedContainer.getQueueName(), targetNode, true, null)) {
NodeId fromNode = reservedContainer.getNodeId();
// We can place container to this targetNode, so just go ahead and notify
// scheduler
if (preemptionContext.getScheduler().moveReservedContainer(
reservedContainer, targetNode)) {
LOG.info("Successfully moved reserved container=" + reservedContainer
.getContainerId() + " from targetNode=" + fromNode
+ " to targetNode=" + targetNode.getNodeID());
touchedNodes.add(targetNode.getNodeID());
}
}
}
}
/**
* Do we allow the demanding queue preempt resource from other queues?
* A satisfied queue is not allowed to preempt resource from other queues.
* @param demandingQueue
* @return allowed/not
*/
private boolean isQueueSatisfied(String demandingQueue,
String partition) {
TempQueuePerPartition tq = preemptionContext.getQueueByPartition(
demandingQueue, partition);
if (null == tq) {
return false;
}
Resource guaranteed = tq.getGuaranteed();
Resource usedDeductReservd = Resources.subtract(tq.getUsed(),
tq.getReserved());
Resource markedToPreemptFromOtherQueue = toPreemptedFromOtherQueues.get(
demandingQueue, partition);
if (null == markedToPreemptFromOtherQueue) {
markedToPreemptFromOtherQueue = Resources.none();
}
// return Used - reserved + to-preempt-from-other-queue >= guaranteed
boolean flag = Resources.greaterThanOrEqual(rc, clusterResource,
Resources.add(usedDeductReservd, markedToPreemptFromOtherQueue),
guaranteed);
return flag;
}
private void incToPreempt(String queue, String partition,
Resource allocated) {
Resource total = toPreemptedFromOtherQueues.get(queue, partition);
if (null == total) {
total = Resources.createResource(0);
toPreemptedFromOtherQueues.put(queue, partition, total);
}
Resources.addTo(total, allocated);
}
@Override
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource,
Resource totalPreemptedResourceAllowed) {
// Initialize digraph from queues
// TODO (wangda): only do this when queue refreshed.
priorityDigraph.clear();
intializePriorityDigraph();
// When all queues are set to same priority, or priority is not respected,
// direct return.
if (priorityDigraph.isEmpty()) {
return selectedCandidates;
}
// Save parameters to be shared by other methods
this.selectedCandidates = selectedCandidates;
this.clusterResource = clusterResource;
this.totalPreemptionAllowed = totalPreemptedResourceAllowed;
toPreemptedFromOtherQueues.clear();
reservedContainers = new ArrayList<>();
// Clear temp-scheduler-node-map every time when doing selection of
// containers.
tempSchedulerNodeMap.clear();
touchedNodes = new HashSet<>();
// Add all reserved containers for analysis
List<FiCaSchedulerNode> allSchedulerNodes =
preemptionContext.getScheduler().getAllNodes();
for (FiCaSchedulerNode node : allSchedulerNodes) {
RMContainer reservedContainer = node.getReservedContainer();
if (null != reservedContainer) {
// Add to reservedContainers list if the queue that the reserved
// container belongs to has high priority than at least one queue
if (priorityDigraph.containsRow(
reservedContainer.getQueueName())) {
reservedContainers.add(reservedContainer);
}
}
}
// Sort reserved container by creation time
Collections.sort(reservedContainers, CONTAINER_CREATION_TIME_COMPARATOR);
long currentTime = System.currentTimeMillis();
// From the begining of the list
for (RMContainer reservedContainer : reservedContainers) {
// Only try to preempt reserved container after reserved container created
// and cannot be allocated after minTimeout
if (currentTime - reservedContainer.getCreationTime() < minTimeout) {
continue;
}
FiCaSchedulerNode node = preemptionContext.getScheduler().getNode(
reservedContainer.getReservedNode());
if (null == node) {
// Something is wrong, ignore
continue;
}
List<RMContainer> newlySelectedToBePreemptContainers = new ArrayList<>();
// Check if we can preempt for this queue
// We will skip if the demanding queue is already satisfied.
String demandingQueueName = reservedContainer.getQueueName();
boolean demandingQueueSatisfied = isQueueSatisfied(demandingQueueName,
node.getPartition());
// We will continue check if it is possible to preempt reserved container
// from the node.
boolean canPreempt = false;
if (!demandingQueueSatisfied) {
canPreempt = canPreemptEnoughResourceForAsked(
reservedContainer.getReservedResource(), demandingQueueName, node,
false, newlySelectedToBePreemptContainers);
}
// Add selected container if we can allocate reserved container by
// preemption others
if (canPreempt) {
touchedNodes.add(node.getNodeID());
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to preempt following containers to make reserved "
+ "container=" + reservedContainer.getContainerId() + " on node="
+ node.getNodeID() + " can be allocated:");
}
// Update to-be-preempt
incToPreempt(demandingQueueName, node.getPartition(),
reservedContainer.getReservedResource());
for (RMContainer c : newlySelectedToBePreemptContainers) {
if (LOG.isDebugEnabled()) {
LOG.debug(" --container=" + c.getContainerId() + " resource=" + c
.getReservedResource());
}
Set<RMContainer> containers = selectedCandidates.get(
c.getApplicationAttemptId());
if (null == containers) {
containers = new HashSet<>();
selectedCandidates.put(c.getApplicationAttemptId(), containers);
}
containers.add(c);
// Update totalPreemptionResourceAllowed
Resources.subtractFrom(totalPreemptedResourceAllowed,
c.getAllocatedResource());
}
} else if (!demandingQueueSatisfied) {
// We failed to get enough resource to allocate the container
// This typically happens when the reserved node is proper, will
// try to see if we can reserve the container on a better host.
// Only do this if the demanding queue is not satisfied.
//
// TODO (wangda): do more tests before making it usable
//
if (allowMoveReservation) {
tryToMakeBetterReservationPlacement(reservedContainer,
allSchedulerNodes);
}
}
}
return selectedCandidates;
}
}

View File

@ -53,6 +53,12 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
protected Resource pendingDeductReserved;
// Relative priority of this queue to its parent
// If parent queue's ordering policy doesn't respect priority,
// this will be always 0
int relativePriority = 0;
TempQueuePerPartition parent = null;
TempQueuePerPartition(String queueName, Resource current,
boolean preemptionDisabled, String partition, Resource killable,
float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
@ -114,8 +120,15 @@ Resource offer(Resource avail, ResourceCalculator rc,
Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
Resources.subtract(getMax(), idealAssigned),
Resource.newInstance(0, 0));
// remain = avail - min(avail, (max - assigned), (current + pending -
// assigned))
// accepted = min{avail,
// max - assigned,
// current + pending - assigned,
// # Make sure a queue will not get more than max of its
// # used/guaranteed, this is to make sure preemption won't
// # happen if all active queues are beyond their guaranteed
// # This is for leaf queue only.
// max(guaranteed, used) - assigned}
// remain = avail - accepted
Resource accepted = Resources.min(rc, clusterResource,
absMaxCapIdealAssignedDelta,
Resources.min(rc, clusterResource, avail, Resources
@ -137,6 +150,21 @@ Resource offer(Resource avail, ResourceCalculator rc,
.subtract(Resources.add(getUsed(),
(considersReservedResource ? pending : pendingDeductReserved)),
idealAssigned)));
// For leaf queue: accept = min(accept, max(guaranteed, used) - assigned)
// Why only for leaf queue?
// Because for a satisfied parent queue, it could have some under-utilized
// leaf queues. Such under-utilized leaf queue could preemption resources
// from over-utilized leaf queue located at other hierarchies.
if (null == children || children.isEmpty()) {
Resource maxOfGuranteedAndUsedDeductAssigned = Resources.subtract(
Resources.max(rc, clusterResource, getUsed(), getGuaranteed()),
idealAssigned);
maxOfGuranteedAndUsedDeductAssigned = Resources.max(rc, clusterResource,
maxOfGuranteedAndUsedDeductAssigned, Resources.none());
accepted = Resources.min(rc, clusterResource, accepted,
maxOfGuranteedAndUsedDeductAssigned);
}
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);
return remain;

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.List;
/**
* This class will save necessary information which copied from
* FiCaSchedulerNode. This is added majorly for performance consideration, this
* can be cached to avoid hitting scheduler again and again. In addition,
* we can add some preemption-required fields to the class.
*/
public class TempSchedulerNode {
private List<RMContainer> runningContainers;
private RMContainer reservedContainer;
private Resource totalResource;
// excluded reserved resource
private Resource allocatedResource;
// total - allocated
private Resource availableResource;
// just a shortcut of reservedContainer.getResource.
private Resource reservedResource;
private NodeId nodeId;
public static TempSchedulerNode fromSchedulerNode(
FiCaSchedulerNode schedulerNode) {
TempSchedulerNode n = new TempSchedulerNode();
n.totalResource = Resources.clone(schedulerNode.getTotalResource());
n.allocatedResource = Resources.clone(schedulerNode.getAllocatedResource());
n.runningContainers = schedulerNode.getCopiedListOfRunningContainers();
n.reservedContainer = schedulerNode.getReservedContainer();
if (n.reservedContainer != null) {
n.reservedResource = n.reservedContainer.getReservedResource();
} else {
n.reservedResource = Resources.none();
}
n.availableResource = Resources.subtract(n.totalResource,
n.allocatedResource);
n.nodeId = schedulerNode.getNodeID();
return n;
}
public NodeId getNodeId() {
return nodeId;
}
public List<RMContainer> getRunningContainers() {
return runningContainers;
}
public void setRunningContainers(List<RMContainer> runningContainers) {
this.runningContainers = runningContainers;
}
public RMContainer getReservedContainer() {
return reservedContainer;
}
public void setReservedContainer(RMContainer reservedContainer) {
this.reservedContainer = reservedContainer;
}
public Resource getTotalResource() {
return totalResource;
}
public void setTotalResource(Resource totalResource) {
this.totalResource = totalResource;
}
public Resource getAllocatedResource() {
return allocatedResource;
}
public void setAllocatedResource(Resource allocatedResource) {
this.allocatedResource = allocatedResource;
}
public Resource getAvailableResource() {
return availableResource;
}
public void setAvailableResource(Resource availableResource) {
this.availableResource = availableResource;
}
public Resource getReservedResource() {
return reservedResource;
}
public void setReservedResource(Resource reservedResource) {
this.reservedResource = reservedResource;
}
}

View File

@ -549,6 +549,11 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {
container.reservedNode = e.getReservedNode();
container.reservedSchedulerKey = e.getReservedSchedulerKey();
Container c = container.getContainer();
if (c != null) {
c.setNodeId(container.reservedNode);
}
if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED)
.contains(container.getState())) {
// When container's state != NEW/RESERVED, it is an increase reservation

View File

@ -407,7 +407,7 @@ public synchronized RMContainer getReservedContainer() {
* Set the reserved container in the node.
* @param reservedContainer Reserved container in the node.
*/
protected synchronized void
public synchronized void
setReservedContainer(RMContainer reservedContainer) {
this.reservedContainer = reservedContainer;
}

View File

@ -109,6 +109,8 @@ public abstract class AbstractCSQueue implements CSQueue {
protected ReentrantReadWriteLock.ReadLock readLock;
protected ReentrantReadWriteLock.WriteLock writeLock;
volatile Priority priority = Priority.newInstance(0);
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this.labelManager = cs.getRMContext().getNodeLabelManager();
@ -336,6 +338,9 @@ void setupQueueConfigs(Resource clusterResource)
csContext.getConfiguration().getReservationContinueLook();
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
this.priority = csContext.getConfiguration().getQueuePriority(
getQueuePath());
} finally {
writeLock.unlock();
}
@ -934,4 +939,9 @@ protected void appFinished() {
this.writeLock.unlock();
}
}
@Override
public Priority getPriority() {
return this.priority;
}
}

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
@ -372,4 +373,10 @@ void apply(Resource cluster,
*/
public void validateSubmitApplication(ApplicationId applicationId,
String userName, String queue) throws AccessControlException;
/**
* Get priority of queue
* @return queue priority
*/
Priority getPriority();
}

View File

@ -268,16 +268,6 @@ public void setResourceCalculator(ResourceCalculator rc) {
this.calculator = rc;
}
@Override
public Comparator<CSQueue> getNonPartitionedQueueComparator() {
return CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR;
}
@Override
public PartitionedQueueComparator getPartitionedQueueComparator() {
return CapacitySchedulerQueueManager.PARTITIONED_QUEUE_COMPARATOR;
}
@Override
public int getNumClusterNodes() {
return nodeTracker.nodeCount();
@ -2505,4 +2495,69 @@ public int getAsyncSchedulingPendingBacklogs() {
public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() {
return this.queueManager;
}
/**
* Try to move a reserved container to a targetNode.
* If the targetNode is reserved by another application (other than this one).
* The previous reservation will be cancelled.
*
* @param toBeMovedContainer reserved container will be moved
* @param targetNode targetNode
* @return true if move succeeded. Return false if the targetNode is reserved by
* a different container or move failed because of any other reasons.
*/
public boolean moveReservedContainer(RMContainer toBeMovedContainer,
FiCaSchedulerNode targetNode) {
try {
writeLock.lock();
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to move container=" + toBeMovedContainer + " to node="
+ targetNode.getNodeID());
}
FiCaSchedulerNode sourceNode = getNode(toBeMovedContainer.getNodeId());
if (null == sourceNode) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to move reservation, cannot find source node="
+ toBeMovedContainer.getNodeId());
}
return false;
}
// Target node updated?
if (getNode(targetNode.getNodeID()) != targetNode) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Failed to move reservation, node updated or removed, moving "
+ "cancelled.");
}
return false;
}
// Target node's reservation status changed?
if (targetNode.getReservedContainer() != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Target node's reservation status changed, moving cancelled.");
}
return false;
}
FiCaSchedulerApp app = getApplicationAttempt(
toBeMovedContainer.getApplicationAttemptId());
if (null == app) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot find to-be-moved container's application="
+ toBeMovedContainer.getApplicationAttemptId());
}
return false;
}
// finally, move the reserved container
return app.moveReservation(toBeMovedContainer, sourceNode, targetNode);
} finally {
writeLock.unlock();
}
}
}

View File

@ -18,19 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.StringTokenizer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -44,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@ -51,6 +41,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@ -59,7 +51,17 @@
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.StringTokenizer;
public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
@ -128,13 +130,20 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String MAXIMUM_ALLOCATION_VCORES =
"maximum-allocation-vcores";
/**
* Ordering policy of queues
*/
public static final String ORDERING_POLICY = "ordering-policy";
public static final String FIFO_ORDERING_POLICY = "fifo";
/*
* Ordering policy inside a leaf queue to sort apps
*/
public static final String FIFO_APP_ORDERING_POLICY = "fifo";
public static final String FAIR_ORDERING_POLICY = "fair";
public static final String FAIR_APP_ORDERING_POLICY = "fair";
public static final String DEFAULT_ORDERING_POLICY = FIFO_ORDERING_POLICY;
public static final String DEFAULT_APP_ORDERING_POLICY =
FIFO_APP_ORDERING_POLICY;
@Private
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
@ -299,6 +308,11 @@ static String getQueuePrefix(String queue) {
return queueName;
}
static String getQueueOrderingPolicyPrefix(String queue) {
String queueName = PREFIX + queue + DOT + ORDERING_POLICY + DOT;
return queueName;
}
private String getNodeLabelPrefix(String queue, String label) {
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
return getQueuePrefix(queue);
@ -401,19 +415,22 @@ public int getUserLimit(String queue) {
return userLimit;
}
// TODO (wangda): We need to better distinguish app ordering policy and queue
// ordering policy's classname / configuration options, etc. And dedup code
// if possible.
@SuppressWarnings("unchecked")
public <S extends SchedulableEntity> OrderingPolicy<S> getOrderingPolicy(
public <S extends SchedulableEntity> OrderingPolicy<S> getAppOrderingPolicy(
String queue) {
String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY,
DEFAULT_ORDERING_POLICY);
DEFAULT_APP_ORDERING_POLICY);
OrderingPolicy<S> orderingPolicy;
if (policyType.trim().equals(FIFO_ORDERING_POLICY)) {
if (policyType.trim().equals(FIFO_APP_ORDERING_POLICY)) {
policyType = FifoOrderingPolicy.class.getName();
}
if (policyType.trim().equals(FAIR_ORDERING_POLICY)) {
if (policyType.trim().equals(FAIR_APP_ORDERING_POLICY)) {
policyType = FairOrderingPolicy.class.getName();
}
try {
@ -734,6 +751,20 @@ public Resource getMaximumAllocation() {
return Resources.createResource(maximumMemory, maximumCores);
}
@Private
public Priority getQueuePriority(String queue) {
String queuePolicyPrefix = getQueuePrefix(queue);
Priority pri = Priority.newInstance(
getInt(queuePolicyPrefix + "priority", 0));
return pri;
}
@Private
public void setQueuePriority(String queue, int priority) {
String queuePolicyPrefix = getQueuePrefix(queue);
setInt(queuePolicyPrefix + "priority", priority);
}
/**
* Get the per queue setting for the maximum limit to allocate to
* each container request.
@ -1204,4 +1235,161 @@ public int getGlobalMaximumApplicationsPerQueue() {
getInt(QUEUE_GLOBAL_MAX_APPLICATION, (int) UNDEFINED);
return maxApplicationsPerQueue;
}
/**
* Ordering policy inside a parent queue to sort queues
*/
/**
* Less relative usage queue can get next resource, this is default
*/
public static final String QUEUE_UTILIZATION_ORDERING_POLICY = "utilization";
/**
* Combination of relative usage and priority
*/
public static final String QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY =
"priority-utilization";
public static final String DEFAULT_QUEUE_ORDERING_POLICY =
QUEUE_UTILIZATION_ORDERING_POLICY;
@Private
public void setQueueOrderingPolicy(String queue, String policy) {
set(getQueuePrefix(queue) + ORDERING_POLICY, policy);
}
@Private
public QueueOrderingPolicy getQueueOrderingPolicy(String queue,
String parentPolicy) {
String defaultPolicy = parentPolicy;
if (null == defaultPolicy) {
defaultPolicy = DEFAULT_QUEUE_ORDERING_POLICY;
}
String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY,
defaultPolicy);
QueueOrderingPolicy qop;
if (policyType.trim().equals(QUEUE_UTILIZATION_ORDERING_POLICY)) {
// Doesn't respect priority
qop = new PriorityUtilizationQueueOrderingPolicy(false);
} else if (policyType.trim().equals(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY)) {
qop = new PriorityUtilizationQueueOrderingPolicy(true);
} else {
String message =
"Unable to construct queue ordering policy=" + policyType + " queue="
+ queue;
throw new YarnRuntimeException(message);
}
return qop;
}
/*
* Get global configuration for ordering policies
*/
private String getOrderingPolicyGlobalConfigKey(String orderPolicyName,
String configKey) {
return PREFIX + ORDERING_POLICY + DOT + orderPolicyName + DOT + configKey;
}
/**
* Global configurations of queue-priority-utilization ordering policy
*/
private static final String UNDER_UTILIZED_PREEMPTION_ENABLED =
"underutilized-preemption.enabled";
/**
* Do we allow under-utilized queue with higher priority to preempt queue
* with lower priority *even if queue with lower priority is not satisfied*.
*
* For example, two queues, a and b
* a.priority = 1, (a.used-capacity - a.reserved-capacity) = 40%
* b.priority = 0, b.used-capacity = 30%
*
* Set this configuration to true to allow queue-a to preempt container from
* queue-b.
*
* (The reason why deduct reserved-capacity from used-capacity for queue with
* higher priority is: the reserved-capacity is just scheduler's internal
* implementation to allocate large containers, it is not possible for
* application to use such reserved-capacity. It is possible that a queue with
* large container requests have a large number of containers but cannot
* allocate from any of them. But scheduler will make sure a satisfied queue
* will not preempt resource from any other queues. A queue is considered to
* be satisfied when queue's used-capacity - reserved-capacity
* guaranteed-capacity.)
*
* @return allowed or not
*/
public boolean getPUOrderingPolicyUnderUtilizedPreemptionEnabled() {
return getBoolean(getOrderingPolicyGlobalConfigKey(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_ENABLED), false);
}
@VisibleForTesting
public void setPUOrderingPolicyUnderUtilizedPreemptionEnabled(
boolean enabled) {
setBoolean(getOrderingPolicyGlobalConfigKey(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_ENABLED), enabled);
}
private static final String UNDER_UTILIZED_PREEMPTION_DELAY =
"underutilized-preemption.reserved-container-delay-ms";
/**
* When a reserved container of an underutilized queue is created. Preemption
* will kick in after specified delay (in ms).
*
* The total time to preempt resources for a reserved container from higher
* priority queue will be: reserved-container-delay-ms +
* {@link CapacitySchedulerConfiguration#PREEMPTION_WAIT_TIME_BEFORE_KILL}.
*
* This parameter is added to make preemption from lower priority queue which
* is underutilized to be more careful. This parameter takes effect when
* underutilized-preemption.enabled set to true.
*
* @return delay
*/
public long getPUOrderingPolicyUnderUtilizedPreemptionDelay() {
return getLong(getOrderingPolicyGlobalConfigKey(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_DELAY), 60000L);
}
@VisibleForTesting
public void setPUOrderingPolicyUnderUtilizedPreemptionDelay(
long timeout) {
setLong(getOrderingPolicyGlobalConfigKey(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_DELAY), timeout);
}
private static final String UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION =
"underutilized-preemption.allow-move-reservation";
/**
* When doing preemption from under-satisfied queues for priority queue.
* Do we allow move reserved container from one host to another?
*
* @return allow or not
*/
public boolean getPUOrderingPolicyUnderUtilizedPreemptionMoveReservation() {
return getBoolean(getOrderingPolicyGlobalConfigKey(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), false);
}
@VisibleForTesting
public void setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(
boolean allowMoveReservation) {
setBoolean(getOrderingPolicyGlobalConfigKey(
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation);
}
}

View File

@ -62,10 +62,6 @@ public interface CapacitySchedulerContext {
ResourceCalculator getResourceCalculator();
Comparator<CSQueue> getNonPartitionedQueueComparator();
PartitionedQueueComparator getPartitionedQueueComparator();
FiCaSchedulerNode getNode(NodeId nodeId);
FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId attemptId);

View File

@ -75,9 +75,6 @@ public int compare(CSQueue q1, CSQueue q2) {
}
};
static final PartitionedQueueComparator PARTITIONED_QUEUE_COMPARATOR =
new PartitionedQueueComparator();
static class QueueHook {
public CSQueue hook(CSQueue queue) {
return queue;

View File

@ -194,7 +194,7 @@ protected void setupQueueConfigs(Resource clusterResource)
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
setOrderingPolicy(
conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
userLimit = conf.getUserLimit(getQueuePath());
userLimitFactor = conf.getUserLimitFactor(getQueuePath());
@ -296,7 +296,7 @@ protected void setupQueueConfigs(Resource clusterResource)
.toString() + "\n" + "reservationsContinueLooking = "
+ reservationsContinueLooking + "\n" + "preemptionDisabled = "
+ getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = "
+ defaultAppPriorityPerQueue);
+ defaultAppPriorityPerQueue + "\npriority = " + priority);
} finally {
writeLock.unlock();
}

View File

@ -18,18 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -64,6 +52,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
@ -73,29 +62,34 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@Private
@Evolving
public class ParentQueue extends AbstractCSQueue {
private static final Log LOG = LogFactory.getLog(ParentQueue.class);
protected final Set<CSQueue> childQueues;
protected final List<CSQueue> childQueues;
private final boolean rootQueue;
private final Comparator<CSQueue> nonPartitionedQueueComparator;
private final PartitionedQueueComparator partitionQueueComparator;
private volatile int numApplications;
private final CapacitySchedulerContext scheduler;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private QueueOrderingPolicy queueOrderingPolicy;
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.scheduler = cs;
this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator();
this.partitionQueueComparator = new PartitionedQueueComparator();
this.rootQueue = (parent == null);
float rawCapacity = cs.getConfiguration().getNonLabeledQueueCapacity(getQueuePath());
@ -107,7 +101,7 @@ public ParentQueue(CapacitySchedulerContext cs,
". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
}
this.childQueues = new TreeSet<CSQueue>(nonPartitionedQueueComparator);
this.childQueues = new ArrayList<>();
setupQueueConfigs(cs.getClusterResource());
@ -116,7 +110,14 @@ public ParentQueue(CapacitySchedulerContext cs,
", fullname=" + getQueuePath());
}
void setupQueueConfigs(Resource clusterResource)
// returns what is configured queue ordering policy
private String getQueueOrderingPolicyConfigName() {
return queueOrderingPolicy == null ?
null :
queueOrderingPolicy.getConfigName();
}
protected void setupQueueConfigs(Resource clusterResource)
throws IOException {
try {
writeLock.lock();
@ -134,13 +135,22 @@ void setupQueueConfigs(Resource clusterResource)
}
}
// Initialize queue ordering policy
queueOrderingPolicy = csContext.getConfiguration().getQueueOrderingPolicy(
getQueuePath(), parent == null ?
null :
((ParentQueue) parent).getQueueOrderingPolicyConfigName());
queueOrderingPolicy.setQueues(childQueues);
LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity()
+ ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
+ ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
+ ", absoluteMaxCapacity=" + this.queueCapacities
.getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls="
+ aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
+ ", reservationsContinueLooking=" + reservationsContinueLooking);
+ ", reservationsContinueLooking=" + reservationsContinueLooking
+ ", orderingPolicy=" + getQueueOrderingPolicyConfigName()
+ ", priority=" + priority);
} finally {
writeLock.unlock();
}
@ -294,8 +304,8 @@ public void reinitialize(CSQueue newlyParsedQueue,
// Re-configure existing child queues and add new ones
// The CS has already checked to ensure all existing child queues are present!
Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
Map<String, CSQueue> newChildQueues = getQueues(
Map<String, CSQueue> currentChildQueues = getQueuesMap(childQueues);
Map<String, CSQueue> newChildQueues = getQueuesMap(
newlyParsedParentQueue.childQueues);
for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
String newChildQueueName = e.getKey();
@ -338,7 +348,7 @@ public void reinitialize(CSQueue newlyParsedQueue,
}
}
Map<String, CSQueue> getQueues(Set<CSQueue> queues) {
private Map<String, CSQueue> getQueuesMap(List<CSQueue> queues) {
Map<String, CSQueue> queuesMap = new HashMap<String, CSQueue>();
for (CSQueue queue : queues) {
queuesMap.put(queue.getQueueName(), queue);
@ -680,13 +690,7 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child,
private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(
String partition) {
// Previously we keep a sorted list for default partition, it is not good
// when multi-threading scheduler is enabled, so to make a simpler code
// now re-sort queue every time irrespective to node partition.
partitionQueueComparator.setPartitionToLookAt(partition);
List<CSQueue> childrenList = new ArrayList<>(childQueues);
Collections.sort(childrenList, partitionQueueComparator);
return childrenList.iterator();
return queueOrderingPolicy.getAssignmentIterator(partition);
}
private CSAssignment assignContainersToChildQueues(Resource cluster,
@ -1083,4 +1087,8 @@ public void stopQueue() {
this.writeLock.unlock();
}
}
public QueueOrderingPolicy getQueueOrderingPolicy() {
return queueOrderingPolicy;
}
}

View File

@ -1,72 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.Comparator;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
public class PartitionedQueueComparator implements Comparator<CSQueue> {
private String partitionToLookAt = null;
public void setPartitionToLookAt(String partitionToLookAt) {
this.partitionToLookAt = partitionToLookAt;
}
@Override
public int compare(CSQueue q1, CSQueue q2) {
/*
* 1. Check accessible to given partition, if one queue accessible and
* the other not, accessible queue goes first.
*/
boolean q1Accessible =
q1.getAccessibleNodeLabels().contains(partitionToLookAt)
|| q1.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY);
boolean q2Accessible =
q2.getAccessibleNodeLabels().contains(partitionToLookAt)
|| q2.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY);
if (q1Accessible && !q2Accessible) {
return -1;
} else if (!q1Accessible && q2Accessible) {
return 1;
}
/*
*
* 2. When two queue has same accessibility, check who will go first:
* Now we simply compare their used resource on the partition to lookAt
*/
float used1 = q1.getQueueCapacities().getUsedCapacity(partitionToLookAt);
float used2 = q2.getQueueCapacities().getUsedCapacity(partitionToLookAt);
if (Math.abs(used1 - used2) < 1e-6) {
// When used capacity is same, compare their guaranteed-capacity
float cap1 = q1.getQueueCapacities().getCapacity(partitionToLookAt);
float cap2 = q2.getQueueCapacities().getCapacity(partitionToLookAt);
// when cap1 == cap2, we will compare queue's name
if (Math.abs(cap1 - cap2) < 1e-6) {
return q1.getQueueName().compareTo(q2.getQueueName());
}
return Float.compare(cap2, cap1);
}
return Float.compare(used1, used2);
}
}

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
/**
* For two queues with the same priority:
* - The queue with less relative used-capacity goes first - todays behavior.
* - The default priority for all queues is 0 and equal. So, we get todays
* behaviour at every level - the queue with the lowest used-capacity
* percentage gets the resources
*
* For two queues with different priorities:
* - Both the queues are under their guaranteed capacities: The queue with
* the higher priority gets resources
* - Both the queues are over or meeting their guaranteed capacities:
* The queue with the higher priority gets resources
* - One of the queues is over or meeting their guaranteed capacities and the
* other is under: The queue that is under its capacity guarantee gets the
* resources.
*/
public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPolicy {
private List<CSQueue> queues;
private boolean respectPriority;
// This makes multiple threads can sort queues at the same time
// For different partitions.
private static ThreadLocal<String> partitionToLookAt =
new ThreadLocal<String>() {
@Override
protected String initialValue() {
return RMNodeLabelsManager.NO_LABEL;
}
};
/**
* Compare two queues with possibly different priority and assigned capacity,
* Will be used by preemption policy as well.
*
* @param relativeAssigned1 relativeAssigned1
* @param relativeAssigned2 relativeAssigned2
* @param priority1 p1
* @param priority2 p2
* @return compared result
*/
public static int compare(double relativeAssigned1, double relativeAssigned2,
int priority1, int priority2) {
if (priority1 == priority2) {
// The queue with less relative used-capacity goes first
return Double.compare(relativeAssigned1, relativeAssigned2);
} else {
// When priority is different:
if ((relativeAssigned1 < 1.0f && relativeAssigned2 < 1.0f) || (
relativeAssigned1 >= 1.0f && relativeAssigned2 >= 1.0f)) {
// When both the queues are under their guaranteed capacities,
// Or both the queues are over or meeting their guaranteed capacities
// queue with higher used-capacity goes first
return Integer.compare(priority2, priority1);
} else {
// Otherwise, when one of the queues is over or meeting their
// guaranteed capacities and the other is under: The queue that is
// under its capacity guarantee gets the resources.
return Double.compare(relativeAssigned1, relativeAssigned2);
}
}
}
/**
* Comparator that both looks at priority and utilization
*/
private class PriorityQueueComparator implements Comparator<CSQueue> {
@Override
public int compare(CSQueue q1, CSQueue q2) {
String p = partitionToLookAt.get();
int rc = compareQueueAccessToPartition(q1, q2, p);
if (0 != rc) {
return rc;
}
float used1 = q1.getQueueCapacities().getUsedCapacity(p);
float used2 = q2.getQueueCapacities().getUsedCapacity(p);
int p1 = 0;
int p2 = 0;
if (respectPriority) {
p1 = q1.getPriority().getPriority();
p2 = q2.getPriority().getPriority();
}
rc = PriorityUtilizationQueueOrderingPolicy.compare(used1, used2, p1, p2);
// For queue with same used ratio / priority, queue with higher configured
// capacity goes first
if (0 == rc) {
float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(p);
float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(p);
return Float.compare(abs2, abs1);
}
return rc;
}
private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2, String partition) {
// Everybody has access to default partition
if (StringUtils.equals(partition, RMNodeLabelsManager.NO_LABEL)) {
return 0;
}
/*
* Check accessible to given partition, if one queue accessible and
* the other not, accessible queue goes first.
*/
boolean q1Accessible =
q1.getAccessibleNodeLabels() != null && q1.getAccessibleNodeLabels()
.contains(partition) || q1.getAccessibleNodeLabels().contains(
RMNodeLabelsManager.ANY);
boolean q2Accessible =
q2.getAccessibleNodeLabels() != null && q2.getAccessibleNodeLabels()
.contains(partition) || q2.getAccessibleNodeLabels().contains(
RMNodeLabelsManager.ANY);
if (q1Accessible && !q2Accessible) {
return -1;
} else if (!q1Accessible && q2Accessible) {
return 1;
}
return 0;
}
}
public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) {
this.respectPriority = respectPriority;
}
@Override
public void setQueues(List<CSQueue> queues) {
this.queues = queues;
}
@Override
public Iterator<CSQueue> getAssignmentIterator(String partition) {
// Since partitionToLookAt is a thread local variable, and every time we
// copy and sort queues, so it's safe for multi-threading environment.
PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);
List<CSQueue> sortedQueue = new ArrayList<>(queues);
Collections.sort(sortedQueue, new PriorityQueueComparator());
return sortedQueue.iterator();
}
@Override
public String getConfigName() {
if (respectPriority) {
return CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY;
} else{
return CapacitySchedulerConfiguration.QUEUE_UTILIZATION_ORDERING_POLICY;
}
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import java.util.Iterator;
import java.util.List;
/**
* This will be used by
* {@link org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue}
* to decide allocation ordering of child queues.
*/
public interface QueueOrderingPolicy {
void setQueues(List<CSQueue> queues);
/**
* Return an iterator over the collection of CSQueues which orders
* them for container assignment.
*
* Please note that, to avoid queue's set updated during sorting / iterating.
* Caller need to make sure parent queue's read lock is properly acquired.
*
* @param partition nodePartition
*
* @return iterator of queues to allocate
*/
Iterator<CSQueue> getAssignmentIterator(String partition);
/**
* Returns configuration name (which will be used to set ordering policy
* @return configuration name
*/
String getConfigName();
}

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@ -1102,4 +1103,85 @@ public void removedToBeRemovedIncreaseRequests() {
}
}
}
/**
* Move reservation from one node to another
* Comparing to unreserve container on source node and reserve a new
* container on target node. This method will not create new RMContainer
* instance. And this operation is atomic.
*
* @param reservedContainer to be moved reserved container
* @param sourceNode source node
* @param targetNode target node
*
* @return succeeded or not
*/
public boolean moveReservation(RMContainer reservedContainer,
FiCaSchedulerNode sourceNode, FiCaSchedulerNode targetNode) {
try {
writeLock.lock();
if (!sourceNode.getPartition().equals(targetNode.getPartition())) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Failed to move reservation, two nodes are in different partition");
}
return false;
}
// Update reserved container to node map
Map<NodeId, RMContainer> map = reservedContainers.get(
reservedContainer.getReservedSchedulerKey());
if (null == map) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot find reserved container map.");
}
return false;
}
// Check if reserved container changed
if (sourceNode.getReservedContainer() != reservedContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("To-be-moved container already updated.");
}
return false;
}
// Check if target node is empty, acquires lock of target node to make sure
// reservation happens transactional
synchronized (targetNode){
if (targetNode.getReservedContainer() != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Target node is already occupied before moving");
}
}
try {
targetNode.reserveResource(this,
reservedContainer.getReservedSchedulerKey(), reservedContainer);
} catch (IllegalStateException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Reserve on target node failed, e=", e);
}
return false;
}
// Set source node's reserved container to null
sourceNode.setReservedContainer(null);
map.remove(sourceNode.getNodeID());
// Update reserved container
reservedContainer.handle(
new RMContainerReservedEvent(reservedContainer.getContainerId(),
reservedContainer.getReservedResource(), targetNode.getNodeID(),
reservedContainer.getReservedSchedulerKey()));
// Add to target node
map.put(targetNode.getNodeID(), reservedContainer);
return true;
}
} finally {
writeLock.unlock();
}
}
}

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@ -62,6 +63,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -477,6 +479,14 @@ private Resource parseResourceFromString(String p) {
* -B...
* </pre>
* ";" splits queues, and there should no empty lines, no extra spaces
*
* For each queue, it has configurations to specify capacities (to each
* partition), format is:
* <pre>
* -<queueName> (<labelName1>=[guaranteed max used pending], \
* <labelName2>=[guaranteed max used pending])
* {key1=value1,key2=value2}; // Additional configs
* </pre>
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
private ParentQueue mockQueueHierarchy(String queueExprs) {
@ -492,6 +502,10 @@ private ParentQueue mockQueueHierarchy(String queueExprs) {
queue = parentQueue;
List<CSQueue> children = new ArrayList<CSQueue>();
when(parentQueue.getChildQueues()).thenReturn(children);
QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
when(policy.getConfigName()).thenReturn(
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
when(parentQueue.getQueueOrderingPolicy()).thenReturn(policy);
} else {
LeafQueue leafQueue = mock(LeafQueue.class);
final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
@ -626,10 +640,57 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
when(queue.getPreemptionDisabled()).thenReturn(
conf.getPreemptionDisabled(queuePath, false));
// Setup other queue configurations
Map<String, String> otherConfigs = getOtherConfigurations(
queueExprArray[idx]);
if (otherConfigs.containsKey("priority")) {
when(queue.getPriority()).thenReturn(
Priority.newInstance(Integer.valueOf(otherConfigs.get("priority"))));
} else {
// set queue's priority to 0 by default
when(queue.getPriority()).thenReturn(Priority.newInstance(0));
}
// Setup disable preemption of queues
if (otherConfigs.containsKey("disable_preemption")) {
when(queue.getPreemptionDisabled()).thenReturn(
Boolean.valueOf(otherConfigs.get("disable_preemption")));
}
nameToCSQueues.put(queueName, queue);
when(cs.getQueue(eq(queueName))).thenReturn(queue);
}
@SuppressWarnings("unchecked")
/**
* Get additional queue's configurations
* @param queueExpr queue expr
* @return maps of configs
*/
private Map<String, String> getOtherConfigurations(String queueExpr) {
if (queueExpr.contains("{")) {
int left = queueExpr.indexOf('{');
int right = queueExpr.indexOf('}');
if (right > left) {
Map<String, String> configs = new HashMap<>();
String subStr = queueExpr.substring(left + 1, right);
for (String kv : subStr.split(",")) {
if (kv.contains("=")) {
String key = kv.substring(0, kv.indexOf("="));
String value = kv.substring(kv.indexOf("=") + 1);
configs.put(key, value);
}
}
return configs;
}
}
return Collections.EMPTY_MAP;
}
/**
* Level of a queue is how many "-" at beginning, root's level is 0
*/
@ -740,6 +801,10 @@ public void checkPendingResource(CSQueue queue, String partition, int pending) {
Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
}
public void checkPriority(CSQueue queue, int expectedPriority) {
Assert.assertEquals(expectedPriority, queue.getPriority().getPriority());
}
public void checkReservedResource(CSQueue queue, String partition, int reserved) {
ResourceUsage ru = queue.getQueueResourceUsage();
Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize());

View File

@ -0,0 +1,361 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class TestPreemptionForQueueWithPriorities
extends ProportionalCapacityPreemptionPolicyMockFramework {
@Before
public void setup() {
super.setup();
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
}
@Test
public void testPreemptionForHighestPriorityUnderutilizedQueue()
throws IOException {
/**
* The simplest test of queue with priorities, Queue structure is:
*
* <pre>
* root
* / | \
* a b c
* </pre>
*
* For priorities
* - a=1
* - b/c=2
*
* So c will preempt more resource from a, till a reaches guaranteed
* resource.
*/
String labelsConfig = "=100,true"; // default partition
String nodesConfig = "n1="; // only one node
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100]);" + //root
"-a(=[30 100 40 50]){priority=1};" + // a
"-b(=[30 100 59 50]){priority=2};" + // b
"-c(=[40 100 1 25]){priority=2}"; // c
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t(1,1,n1,,40,false);" + // app1 in a
"b\t(1,1,n1,,59,false);" + // app2 in b
"c\t(1,1,n1,,1,false);"; // app3 in c
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// 10 preempted from app1, 15 preempted from app2, and nothing preempted
// from app3
verify(mDisp, times(10)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(15)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testPreemptionForLowestPriorityUnderutilizedQueue()
throws IOException {
/**
* Similar to above, make sure we can still make sure less utilized queue
* can get resource first regardless of priority.
*
* Queue structure is:
*
* <pre>
* root
* / | \
* a b c
* </pre>
*
* For priorities
* - a=1
* - b=2
* - c=0
*
* So c will preempt more resource from a, till a reaches guaranteed
* resource.
*/
String labelsConfig = "=100,true"; // default partition
String nodesConfig = "n1="; // only one node
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100]);" + //root
"-a(=[30 100 40 50]){priority=1};" + // a
"-b(=[30 100 59 50]){priority=2};" + // b
"-c(=[40 100 1 25]){priority=0}"; // c
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t(1,1,n1,,40,false);" + // app1 in a
"b\t(1,1,n1,,59,false);" + // app2 in b
"c\t(1,1,n1,,1,false);"; // app3 in c
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// 10 preempted from app1, 15 preempted from app2, and nothing preempted
// from app3
verify(mDisp, times(10)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(15)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@Test
public void testPreemptionWontHappenBetweenSatisfiedQueues()
throws IOException {
/**
* No preemption happen if a queue is already satisfied, regardless of
* priority
*
* Queue structure is:
*
* <pre>
* root
* / | \
* a b c
* </pre>
*
* For priorities
* - a=1
* - b=1
* - c=2
*
* When c is satisfied, it will not preempt any resource from other queues
*/
String labelsConfig = "=100,true"; // default partition
String nodesConfig = "n1="; // only one node
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100]);" + //root
"-a(=[30 100 0 0]){priority=1};" + // a
"-b(=[30 100 40 50]){priority=1};" + // b
"-c(=[40 100 60 25]){priority=2}"; // c
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"b\t(1,1,n1,,40,false);" + // app1 in b
"c\t(1,1,n1,,60,false)"; // app2 in c
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Nothing preempted
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
}
@Test
public void testPreemptionForMultipleQueuesInTheSamePriorityBuckets()
throws IOException {
/**
* When a cluster has different priorities, each priority has multiple
* queues, preemption policy should try to balance resource between queues
* with same priority by ratio of their capacities
*
* Queue structure is:
*
* <pre>
* root
* - a (capacity=10), p=1
* - b (capacity=15), p=1
* - c (capacity=20), p=2
* - d (capacity=25), p=2
* - e (capacity=30), p=2
* </pre>
*/
String labelsConfig = "=100,true"; // default partition
String nodesConfig = "n1="; // only one node
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100]);" + //root
"-a(=[10 100 35 50]){priority=1};" + // a
"-b(=[15 100 25 50]){priority=1};" + // b
"-c(=[20 100 39 50]){priority=2};" + // c
"-d(=[25 100 0 0]){priority=2};" + // d
"-e(=[30 100 1 99]){priority=2}"; // e
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t(1,1,n1,,35,false);" + // app1 in a
"b\t(1,1,n1,,25,false);" + // app2 in b
"c\t(1,1,n1,,39,false);" + // app3 in c
"e\t(1,1,n1,,1,false)"; // app4 in e
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// 23 preempted from app1, 6 preempted from app2, and nothing preempted
// from app3/app4
// (After preemption, a has 35 - 23 = 12, b has 25 - 6 = 19, so a:b after
// preemption is 1.58, close to 1.50)
verify(mDisp, times(23)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(6)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
@Test
public void testPreemptionForPriorityAndDisablePreemption()
throws IOException {
/**
* When a cluster has different priorities, each priority has multiple
* queues, preemption policy should try to balance resource between queues
* with same priority by ratio of their capacities.
*
* But also we need to make sure preemption disable will be honered
* regardless of priority.
*
* Queue structure is:
*
* <pre>
* root
* - a (capacity=10), p=1
* - b (capacity=15), p=1
* - c (capacity=20), p=2
* - d (capacity=25), p=2
* - e (capacity=30), p=2
* </pre>
*/
String labelsConfig = "=100,true"; // default partition
String nodesConfig = "n1="; // only one node
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100]);" + //root
"-a(=[10 100 35 50]){priority=1,disable_preemption=true};" + // a
"-b(=[15 100 25 50]){priority=1};" + // b
"-c(=[20 100 39 50]){priority=2};" + // c
"-d(=[25 100 0 0]){priority=2};" + // d
"-e(=[30 100 1 99]){priority=2}"; // e
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a\t(1,1,n1,,35,false);" + // app1 in a
"b\t(1,1,n1,,25,false);" + // app2 in b
"c\t(1,1,n1,,39,false);" + // app3 in c
"e\t(1,1,n1,,1,false)"; // app4 in e
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// We suppose to preempt some resource from A, but now since queueA
// disables preemption, so we need to preempt some resource from B and
// some from C even if C has higher priority than A
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
verify(mDisp, times(19)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
@Test
public void testPriorityPreemptionForHierarchicalOfQueues()
throws IOException {
/**
* When a queue has multiple hierarchy and different priorities:
*
* <pre>
* root
* - a (capacity=30), p=1
* - a1 (capacity=40), p=1
* - a2 (capacity=60), p=1
* - b (capacity=30), p=1
* - b1 (capacity=50), p=1
* - b1 (capacity=50), p=2
* - c (capacity=40), p=2
* </pre>
*/
String labelsConfig = "=100,true"; // default partition
String nodesConfig = "n1="; // only one node
String queuesConfig =
// guaranteed,max,used,pending
"root(=[100 100 100 100]);" + //root
"-a(=[30 100 40 50]){priority=1};" + // a
"--a1(=[12 100 20 50]){priority=1};" + // a1
"--a2(=[18 100 20 50]){priority=1};" + // a2
"-b(=[30 100 59 50]){priority=1};" + // b
"--b1(=[15 100 30 50]){priority=1};" + // b1
"--b2(=[15 100 29 50]){priority=2};" + // b2
"-c(=[40 100 1 30]){priority=1}"; // c
String appsConfig =
//queueName\t(priority,resource,host,expression,#repeat,reserved)
"a1\t(1,1,n1,,20,false);" + // app1 in a1
"a2\t(1,1,n1,,20,false);" + // app2 in a2
"b1\t(1,1,n1,,30,false);" + // app3 in b1
"b2\t(1,1,n1,,29,false);" + // app4 in b2
"c\t(1,1,n1,,29,false)"; // app5 in c
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// Preemption should first divide capacities between a / b, and b2 should
// get less preemption than b1 (because b2 has higher priority)
verify(mDisp, times(5)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(1))));
verify(mDisp, never()).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(2))));
verify(mDisp, times(15)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
verify(mDisp, times(9)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
}
}

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@ -219,7 +220,9 @@ public void testProportionalPreemption() {
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
// A will preempt guaranteed-allocated.
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
@ -587,8 +590,8 @@ public void testOverCapacityImbalance() {
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// correct imbalance between over-capacity queues
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
// Will not preempt for over capacity queues
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
@ -701,7 +704,7 @@ public void testZeroGuar() {
public void testZeroGuarOverCap() {
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 0, 99, 0, 100, 100 }, // abs
{ 200, 100, 0, 100, 0, 100, 100 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 170, 170, 60, 20, 90, 0, 0 }, // used
{ 85, 50, 30, 10, 10, 20, 20 }, // pending
@ -712,14 +715,14 @@ public void testZeroGuarOverCap() {
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// we verify both that C has priority on B and D (has it has >0 guarantees)
// and that B and D are force to share their over capacity fairly (as they
// are both zero-guarantees) hence D sees some of its containers preempted
verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC)));
// No preemption should happen because zero guaranteed queues should be
// treated as always satisfied, they should not preempt from each other.
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB)));
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC)));
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD)));
}
@Test
public void testHierarchicalLarge() {
int[][] qData = new int[][] {
@ -1231,6 +1234,13 @@ ParentQueue mockParentQueue(ParentQueue p, int subqueues,
when(pq.getChildQueues()).thenReturn(cqs);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(pq.getReadLock()).thenReturn(lock.readLock());
// Ordering policy
QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class);
when(policy.getConfigName()).thenReturn(
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
when(pq.getQueueOrderingPolicy()).thenReturn(policy);
when(pq.getPriority()).thenReturn(Priority.newInstance(0));
for (int i = 0; i < subqueues; ++i) {
pqs.add(pq);
}
@ -1301,6 +1311,7 @@ public Object answer(InvocationOnMock invocation) {
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
when(lq.getReadLock()).thenReturn(lock.readLock());
when(lq.getPriority()).thenReturn(Priority.newInstance(0));
p.getChildQueues().add(lq);
return lq;
}

View File

@ -95,7 +95,7 @@ public void testNodePartitionPreemptionRespectGuaranteedCapacity()
}
@Test
public void testNodePartitionPreemptionRespectMaximumCapacity()
public void testNodePartitionPreemptionNotHappenBetweenSatisfiedQueues()
throws IOException {
/**
* Queue structure is:
@ -114,8 +114,8 @@ public void testNodePartitionPreemptionRespectMaximumCapacity()
* 2 apps in cluster.
* app1 in b and app2 in c.
*
* app1 uses 90x, and app2 use 10x. After preemption, app2 will preempt 10x
* from app1 because of max capacity.
* app1 uses 90x, and app2 use 10x. We don't expect preemption happen
* between them because all of them are satisfied
*/
String labelsConfig =
"=100,true;" + // default partition
@ -139,9 +139,8 @@ public void testNodePartitionPreemptionRespectMaximumCapacity()
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
policy.editSchedule();
// 30 preempted from app1, 30 preempted from app4, and nothing preempted
// from app2/app3
verify(mDisp, times(20)).handle(
// No preemption happens
verify(mDisp, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
verify(mDisp, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));

View File

@ -46,8 +46,8 @@ public void testBuilder() throws Exception {
"root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root
"-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a
"--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1
"--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2
"-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
"--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]){priority=2};" + // a2
"-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0]){priority=1,disable_preemption=true}";
String appsConfig=
//queueName\t(priority,resource,host,expression,#repeat,reserved)
// app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
@ -75,6 +75,7 @@ public void testBuilder() throws Exception {
checkPendingResource(cs.getQueue("root"), "red", 100);
checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f);
checkPendingResource(cs.getQueue("root"), "blue", 200);
checkPriority(cs.getQueue("root"), 0); // default
// a
checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f);
@ -83,6 +84,7 @@ public void testBuilder() throws Exception {
checkPendingResource(cs.getQueue("a"), "red", 0);
checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f);
checkPendingResource(cs.getQueue("a"), "blue", 200);
checkPriority(cs.getQueue("a"), 0); // default
// a1
checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f);
@ -91,6 +93,7 @@ public void testBuilder() throws Exception {
checkPendingResource(cs.getQueue("a1"), "red", 0);
checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f);
checkPendingResource(cs.getQueue("a1"), "blue", 0);
checkPriority(cs.getQueue("a1"), 0); // default
// a2
checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f);
@ -99,14 +102,18 @@ public void testBuilder() throws Exception {
checkPendingResource(cs.getQueue("a2"), "red", 0);
checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f);
checkPendingResource(cs.getQueue("a2"), "blue", 200);
checkPriority(cs.getQueue("a2"), 2);
Assert.assertFalse(cs.getQueue("a2").getPreemptionDisabled());
// b1
// b
checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f);
checkPendingResource(cs.getQueue("b"), "", 0);
checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f);
checkPendingResource(cs.getQueue("b"), "red", 100);
checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f);
checkPendingResource(cs.getQueue("b"), "blue", 0);
checkPriority(cs.getQueue("b"), 1);
Assert.assertTrue(cs.getQueue("b").getPreemptionDisabled());
// Check ignored partitioned containers in queue
Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1"))

View File

@ -46,7 +46,7 @@ public class CapacitySchedulerPreemptionTestBase {
final int GB = 1024;
Configuration conf;
CapacitySchedulerConfiguration conf;
RMNodeLabelsManager mgr;
@ -54,13 +54,15 @@ public class CapacitySchedulerPreemptionTestBase {
@Before
void setUp() throws Exception {
conf = new YarnConfiguration();
conf = new CapacitySchedulerConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
conf = (CapacitySchedulerConfiguration) TestUtils
.getConfigurationWithMultipleQueues(this.conf);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100 * GB);
// Set preemption related configurations
conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
@ -146,4 +148,18 @@ public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
Assert.fail();
}
public void checkNumberOfPreemptionCandidateFromApp(
ProportionalCapacityPreemptionPolicy policy, int expected,
ApplicationAttemptId attemptId) {
int total = 0;
for (RMContainer rmContainer : policy.getToPreemptContainers().keySet()) {
if (rmContainer.getApplicationAttemptId().equals(attemptId)) {
++ total;
}
}
Assert.assertEquals(expected, total);
}
}

View File

@ -110,9 +110,6 @@ public void setUp() throws IOException {
thenReturn(Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
@ -276,9 +273,6 @@ public void testLimitsComputation() throws Exception {
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 16));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
@ -581,9 +575,6 @@ public void testHeadroom() throws Exception {
thenReturn(Resources.createResource(GB));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);

View File

@ -594,9 +594,6 @@ public void testHeadroom() throws Exception {
.thenReturn(Resources.createResource(GB));
when(csContext.getMaximumResourceCapability())
.thenReturn(Resources.createResource(16 * GB));
when(csContext.getNonPartitionedQueueComparator())
.thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
RMContext rmContext = TestUtils.getMockRMContext();
RMContext spyRMContext = spy(rmContext);

View File

@ -22,22 +22,26 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;
public class TestCapacitySchedulerSurgicalPreemption
extends CapacitySchedulerPreemptionTestBase {
@ -167,8 +171,7 @@ public void testSurgicalPreemptionWithAvailableResource()
*
* 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
*
* 2) app1 submit to queue-a first, it asked 38 * 1G containers
* We will allocate 20 on n1 and 19 on n2.
* 2) app1 submit to queue-b, asks for 1G * 5
*
* 3) app2 submit to queue-c, ask for one 4G container (for AM)
*
@ -243,4 +246,569 @@ public void testSurgicalPreemptionWithAvailableResource()
rm1.close();
}
@Test(timeout = 60000)
public void testPriorityPreemptionWhenAllQueuesAreBelowGuaranteedCapacities()
throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
*
* 2) app1 submit to queue-b first, it asked 6 * 1G containers
* We will allocate 4 on n1 (including AM) and 3 on n2.
*
* 3) app2 submit to queue-c, ask for one 18G container (for AM)
*
* After preemption, we should expect:
* Preempt 3 containers from app1 and AM of app2 successfully allocated.
*/
conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
// Queue c has higher priority than a/b
conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
// Do allocation for node1/node2
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 7 containers now, so the abs-used-cap of b is
// 7 / 40 = 17.5% < 20% (guaranteed)
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
// 4 from n1 and 3 from n2
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
am1.getApplicationAttemptId(), 4);
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
am1.getApplicationAttemptId(), 3);
// Submit app2 to queue-c and asks for a 1G container for AM
RMApp app2 = rm1.submitApp(18 * GB, "app", "user", null, "c");
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() == null) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Thread.sleep(10);
}
// Call editSchedule immediately: containers are not selected
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
editPolicy.editSchedule();
Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
// Sleep the timeout interval, we should be able to see containers selected
Thread.sleep(1000);
editPolicy.editSchedule();
Assert.assertEquals(2, editPolicy.getToPreemptContainers().size());
// Call editSchedule again: selected containers are killed, and new AM
// container launched
editPolicy.editSchedule();
// Do allocation till reserved container allocated
while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Thread.sleep(10);
}
waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
rm1.close();
}
@Test(timeout = 300000)
public void testPriorityPreemptionRequiresMoveReservation()
throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* 1) 3 nodes in the cluster, 10G for each
*
* 2) app1 submit to queue-b first, it asked 2G each,
* it can get 2G on n1 (AM), 2 * 2G on n2
*
* 3) app2 submit to queue-c, with 2G AM container (allocated on n3)
* app2 requires 9G resource, which will be reserved on n3
*
* We should expect container unreserved from n3 and allocated on n1/n2
*/
conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
conf.setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(true);
// Queue c has higher priority than a/b
conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB);
MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
RMNode rmNode3 = rm1.getRMContext().getRMNodes().get(nm3.getNodeId());
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 2 * GB, 2, new ArrayList<ContainerId>());
// Do allocation for node2 twice
for (int i = 0; i < 2; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
// 1 from n1 and 2 from n2
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
am1.getApplicationAttemptId(), 1);
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
am1.getApplicationAttemptId(), 2);
// Submit app2 to queue-c and asks for a 2G container for AM, on n3
RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
// Asks 1 * 9G container
am2.allocate("*", 9 * GB, 1, new ArrayList<ContainerId>());
// Do allocation for node3 once
cs.handle(new NodeUpdateSchedulerEvent(rmNode3));
// Make sure container reserved on node3
Assert.assertNotNull(
cs.getNode(rmNode3.getNodeID()).getReservedContainer());
// Call editSchedule immediately: nothing happens
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
editPolicy.editSchedule();
Assert.assertNotNull(
cs.getNode(rmNode3.getNodeID()).getReservedContainer());
// Sleep the timeout interval, we should be able to see reserved container
// moved to n2 (n1 occupied by AM)
Thread.sleep(1000);
editPolicy.editSchedule();
Assert.assertNull(
cs.getNode(rmNode3.getNodeID()).getReservedContainer());
Assert.assertNotNull(
cs.getNode(rmNode2.getNodeID()).getReservedContainer());
Assert.assertEquals(am2.getApplicationAttemptId(), cs.getNode(
rmNode2.getNodeID()).getReservedContainer().getApplicationAttemptId());
// Do it again, we should see containers marked to be preempt
editPolicy.editSchedule();
Assert.assertEquals(2, editPolicy.getToPreemptContainers().size());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
// Do allocation till reserved container allocated
while (schedulerApp2.getLiveContainers().size() < 2) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
Thread.sleep(200);
}
waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
rm1.close();
}
@Test(timeout = 60000)
public void testPriorityPreemptionOnlyTriggeredWhenDemandingQueueUnsatisfied()
throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* 1) 10 nodes (n0-n9) in the cluster, each of them has 10G.
*
* 2) app1 submit to queue-b first, it asked 8 * 1G containers
* We will allocate 1 container on each of n0-n10
*
* 3) app2 submit to queue-c, ask for 10 * 10G containers (including AM)
*
* After preemption, we should expect:
* Preempt 7 containers from app1 and usage of app2 is 70%
*/
conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
// Queue c has higher priority than a/b
conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM[] mockNMs = new MockNM[10];
for (int i = 0; i < 10; i++) {
mockNMs[i] = rm1.registerNode("h" + i + ":1234", 10 * GB);
}
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode[] rmNodes = new RMNode[10];
for (int i = 0; i < 10; i++) {
rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
}
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[0]);
am1.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>());
// Do allocation for nm1-nm8
for (int i = 1; i < 9; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// App1 should have 9 containers now, so the abs-used-cap of b is 9%
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(9, schedulerApp1.getLiveContainers().size());
for (int i = 0; i < 9; i++) {
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()),
am1.getApplicationAttemptId(), 1);
}
// Submit app2 to queue-c and asks for a 10G container for AM
// Launch AM in NM9
RMApp app2 = rm1.submitApp(10 * GB, "app", "user", null, "c");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[9]);
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
// Ask 10 * 10GB containers
am2.allocate("*", 10 * GB, 10, new ArrayList<ContainerId>());
// Do allocation for all nms
for (int i = 1; i < 10; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// Check am2 reserved resource from nm1-nm9
for (int i = 1; i < 9; i++) {
Assert.assertNotNull("Should reserve on nm-" + i,
cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
}
// Sleep the timeout interval, we should be able to see 6 containers selected
// 6 (selected) + 1 (allocated) which makes target capacity to 70%
Thread.sleep(1000);
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
editPolicy.editSchedule();
checkNumberOfPreemptionCandidateFromApp(editPolicy, 6,
am1.getApplicationAttemptId());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
// Do allocation for all nms
for (int i = 1; i < 10; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
waitNumberOfLiveContainersFromApp(schedulerApp2, 7);
waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
rm1.close();
}
@Test(timeout = 600000)
public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer()
throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 45 45 10
* </pre>
*
* Priority of queue_a = 1
* Priority of queue_b = 2
*
* 1) 5 nodes (n0-n4) in the cluster, each of them has 4G.
*
* 2) app1 submit to queue-c first (AM=1G), it asked 4 * 1G containers
* We will allocate 1 container on each of n0-n4. AM on n4.
*
* 3) app2 submit to queue-a, AM container=0.5G, allocated on n0
* Ask for 2 * 3.5G containers. (Reserved on n0/n1)
*
* 4) app2 submit to queue-b, AM container=0.5G, allocated on n2
* Ask for 2 * 3.5G containers. (Reserved on n2/n3)
*
* First we will preempt container on n2 since it is the oldest container of
* Highest priority queue (b)
*/
// Total preemption = 1G per round, which is 5% of cluster resource (20G)
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
0.05f);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
// A/B has higher priority
conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 1);
conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2);
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f);
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f);
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f);
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM[] mockNMs = new MockNM[5];
for (int i = 0; i < 5; i++) {
mockNMs[i] = rm1.registerNode("h" + i + ":1234", 4 * GB);
}
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode[] rmNodes = new RMNode[5];
for (int i = 0; i < 5; i++) {
rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
}
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]);
am1.allocate("*", 1 * GB, 4, new ArrayList<ContainerId>());
// Do allocation for nm1-nm8
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// App1 should have 5 containers now, one for each node
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
for (int i = 0; i < 5; i++) {
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()),
am1.getApplicationAttemptId(), 1);
}
// Submit app2 to queue-a and asks for a 0.5G container for AM (on n0)
RMApp app2 = rm1.submitApp(512, "app", "user", null, "a");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]);
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
// Ask 2 * 3.5GB containers
am2.allocate("*", 3 * GB + 512, 2, new ArrayList<ContainerId>());
// Do allocation for n0-n1
for (int i = 0; i < 2; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// Check am2 reserved resource from nm0-nm1
for (int i = 0; i < 2; i++) {
Assert.assertNotNull("Should reserve on nm-" + i,
cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID())
.getReservedContainer().getQueueName(), "a");
}
// Submit app3 to queue-b and asks for a 0.5G container for AM (on n2)
RMApp app3 = rm1.submitApp(512, "app", "user", null, "b");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]);
FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app3.getApplicationId(), 1));
// Ask 2 * 3.5GB containers
am3.allocate("*", 3 * GB + 512, 2, new ArrayList<ContainerId>());
// Do allocation for n2-n3
for (int i = 2; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// Check am2 reserved resource from nm2-nm3
for (int i = 2; i < 4; i++) {
Assert.assertNotNull("Should reserve on nm-" + i,
cs.getNode(rmNodes[i].getNodeID()).getReservedContainer());
Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID())
.getReservedContainer().getQueueName(), "b");
}
// Sleep the timeout interval, we should be able to see 1 container selected
Thread.sleep(1000);
/* 1st container preempted is on n2 */
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
editPolicy.editSchedule();
// We should have one to-preempt container, on node[2]
Set<RMContainer> selectedToPreempt =
editPolicy.getToPreemptContainers().keySet();
Assert.assertEquals(1, selectedToPreempt.size());
Assert.assertEquals(mockNMs[2].getNodeId(),
selectedToPreempt.iterator().next().getAllocatedNode());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
// Do allocation for all nms
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
waitNumberOfLiveContainersFromApp(schedulerApp3, 2);
/* 2nd container preempted is on n3 */
editPolicy.editSchedule();
// We should have one to-preempt container, on node[3]
selectedToPreempt =
editPolicy.getToPreemptContainers().keySet();
Assert.assertEquals(1, selectedToPreempt.size());
Assert.assertEquals(mockNMs[3].getNodeId(),
selectedToPreempt.iterator().next().getAllocatedNode());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
// Do allocation for all nms
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
/* 3rd container preempted is on n0 */
editPolicy.editSchedule();
// We should have one to-preempt container, on node[0]
selectedToPreempt =
editPolicy.getToPreemptContainers().keySet();
Assert.assertEquals(1, selectedToPreempt.size());
Assert.assertEquals(mockNMs[0].getNodeId(),
selectedToPreempt.iterator().next().getAllocatedNode());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 2);
// Do allocation for all nms
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
waitNumberOfLiveContainersFromApp(schedulerApp1, 2);
waitNumberOfLiveContainersFromApp(schedulerApp2, 2);
waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
/* 4th container preempted is on n1 */
editPolicy.editSchedule();
// We should have one to-preempt container, on node[0]
selectedToPreempt =
editPolicy.getToPreemptContainers().keySet();
Assert.assertEquals(1, selectedToPreempt.size());
Assert.assertEquals(mockNMs[1].getNodeId(),
selectedToPreempt.iterator().next().getAllocatedNode());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
// Do allocation for all nms
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
waitNumberOfLiveContainersFromApp(schedulerApp2, 3);
waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
rm1.close();
}
}

View File

@ -97,9 +97,6 @@ public void setUp() throws Exception {
Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);

View File

@ -776,4 +776,115 @@ public void testPendingResourcesConsideringUserLimit() throws Exception {
Resources.createResource(20 * GB), "", true).getMemorySize());
rm1.close();
}
@Test(timeout = 60000)
public void testQueuePriorityOrdering() throws Exception {
CapacitySchedulerConfiguration newConf =
(CapacitySchedulerConfiguration) TestUtils
.getConfigurationWithMultipleQueues(conf);
// Set ordering policy
newConf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT,
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
// Set maximum capacity of A to 20
newConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 20);
newConf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1);
newConf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2);
newConf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 3);
MockRM rm1 = new MockRM(newConf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 100 * GB);
// launch an app to queue A, AM container should be launched in nm1
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch an app to queue B, AM container should be launched in nm1
RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "b");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// launch an app to queue C, AM container should be launched in nm1
RMApp app3 = rm1.submitApp(2 * GB, "app", "user", null, "c");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
// Each application asks 10 * 5GB containers
am1.allocate("*", 5 * GB, 10, null);
am2.allocate("*", 5 * GB, 10, null);
am3.allocate("*", 5 * GB, 10, null);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp3 =
cs.getApplicationAttempt(am3.getApplicationAttemptId());
// container will be allocated to am1
// App1 will get 2 container allocated (plus AM container)
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp3.getLiveContainers().size());
// container will be allocated to am1 again,
// App1 will get 3 container allocated (plus AM container)
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp3.getLiveContainers().size());
// (Now usages of queues: a=12G (satisfied), b=2G, c=2G)
// container will be allocated to am2 (since app1 reaches its guaranteed
// capacity)
// App2 will get 2 container allocated (plus AM container)
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp3.getLiveContainers().size());
// Do this 3 times
// container will be allocated to am2 (since app1 reaches its guaranteed
// capacity)
// App2 will get 2 container allocated (plus AM container)
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(5, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp3.getLiveContainers().size());
// (Now usages of queues: a=12G (satisfied), b=22G (satisfied), c=2G))
// Do this 10 times
for (int i = 0; i < 10; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(5, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(11, schedulerApp3.getLiveContainers().size());
// (Now usages of queues: a=12G (satisfied), b=22G (satisfied),
// c=52G (satisfied and no pending))
// Do this 20 times, we can only allocate 3 containers, 1 to A and 3 to B
for (int i = 0; i < 20; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(6, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(11, schedulerApp3.getLiveContainers().size());
// (Now usages of queues: a=17G (satisfied), b=27G (satisfied), c=52G))
rm1.close();
}
}

View File

@ -176,9 +176,6 @@ private void setUpInternal(ResourceCalculator rC) throws Exception {
thenReturn(Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
@ -415,7 +412,7 @@ public void testPolicyConfiguration() throws Exception {
"testPolicyRoot" + System.currentTimeMillis();
OrderingPolicy<FiCaSchedulerApp> comPol =
testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
testConf.<FiCaSchedulerApp>getAppOrderingPolicy(tproot);
}
@ -490,16 +487,16 @@ public void testFairConfiguration() throws Exception {
"testPolicyRoot" + System.currentTimeMillis();
OrderingPolicy<FiCaSchedulerApp> schedOrder =
testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
testConf.<FiCaSchedulerApp>getAppOrderingPolicy(tproot);
//override default to fair
String policyType = CapacitySchedulerConfiguration.PREFIX + tproot +
"." + CapacitySchedulerConfiguration.ORDERING_POLICY;
testConf.set(policyType,
CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY);
CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY);
schedOrder =
testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
testConf.<FiCaSchedulerApp>getAppOrderingPolicy(tproot);
FairOrderingPolicy fop = (FairOrderingPolicy<FiCaSchedulerApp>) schedOrder;
assertFalse(fop.getSizeBasedWeight());
@ -509,7 +506,7 @@ public void testFairConfiguration() throws Exception {
FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT;
testConf.set(sbwConfig, "true");
schedOrder =
testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
testConf.<FiCaSchedulerApp>getAppOrderingPolicy(tproot);
fop = (FairOrderingPolicy<FiCaSchedulerApp>) schedOrder;
assertTrue(fop.getSizeBasedWeight());

View File

@ -97,9 +97,6 @@ public void setUp() throws Exception {
Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);

View File

@ -133,8 +133,6 @@ private void setup(CapacitySchedulerConfiguration csConf,
Resources.createResource(16 * GB, 12));
when(csContext.getClusterResource()).thenReturn(
Resources.createResource(100 * 16 * GB, 100 * 12));
when(csContext.getNonPartitionedQueueComparator()).thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getRMContext()).thenReturn(rmContext);

View File

@ -0,0 +1,222 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableTable;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPriorityUtilizationQueueOrderingPolicy {
private List<CSQueue> mockCSQueues(String[] queueNames, int[] priorities,
float[] utilizations, String partition) {
// sanity check
assert queueNames != null && priorities != null && utilizations != null
&& queueNames.length > 0 && queueNames.length == priorities.length
&& priorities.length == utilizations.length;
List<CSQueue> list = new ArrayList<>();
for (int i = 0; i < queueNames.length; i++) {
CSQueue q = mock(CSQueue.class);
when(q.getQueueName()).thenReturn(queueNames[i]);
QueueCapacities qc = new QueueCapacities(false);
qc.setUsedCapacity(partition, utilizations[i]);
when(q.getQueueCapacities()).thenReturn(qc);
when(q.getPriority()).thenReturn(Priority.newInstance(priorities[i]));
list.add(q);
}
return list;
}
private void verifyOrder(QueueOrderingPolicy orderingPolicy, String partition,
String[] expectedOrder) {
Iterator<CSQueue> iter = orderingPolicy.getAssignmentIterator(partition);
int i = 0;
while (iter.hasNext()) {
CSQueue q = iter.next();
Assert.assertEquals(expectedOrder[i], q.getQueueName());
i++;
}
assert i == expectedOrder.length;
}
@Test
public void testUtilizationOrdering() {
PriorityUtilizationQueueOrderingPolicy policy =
new PriorityUtilizationQueueOrderingPolicy(false);
// Case 1, one queue
policy.setQueues(mockCSQueues(new String[] { "a" }, new int[] { 0 },
new float[] { 0.1f }, ""));
verifyOrder(policy, "", new String[] { "a" });
// Case 2, 2 queues
policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 0, 0 },
new float[] { 0.1f, 0.0f }, ""));
verifyOrder(policy, "", new String[] { "b", "a" });
// Case 3, 3 queues
policy.setQueues(
mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 0, 0, 0 },
new float[] { 0.1f, 0.0f, 0.2f }, ""));
verifyOrder(policy, "", new String[] { "b", "a", "c" });
// Case 4, 3 queues, ignore priority
policy.setQueues(
mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 },
new float[] { 0.1f, 0.0f, 0.2f }, ""));
verifyOrder(policy, "", new String[] { "b", "a", "c" });
// Case 5, 3 queues, look at partition (default)
policy.setQueues(
mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 },
new float[] { 0.1f, 0.0f, 0.2f }, "x"));
verifyOrder(policy, "", new String[] { "a", "b", "c" });
// Case 5, 3 queues, look at partition (x)
policy.setQueues(
mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 },
new float[] { 0.1f, 0.0f, 0.2f }, "x"));
verifyOrder(policy, "x", new String[] { "b", "a", "c" });
// Case 6, 3 queues, with different accessibility to partition
List<CSQueue> queues = mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 },
new float[] { 0.1f, 0.0f, 0.2f }, "x");
// a can access "x"
when(queues.get(0).getAccessibleNodeLabels()).thenReturn(ImmutableSet.of("x", "y"));
// c can access "x"
when(queues.get(2).getAccessibleNodeLabels()).thenReturn(ImmutableSet.of("x", "y"));
policy.setQueues(queues);
verifyOrder(policy, "x", new String[] { "a", "c", "b" });
}
@Test
public void testPriorityUtilizationOrdering() {
PriorityUtilizationQueueOrderingPolicy policy =
new PriorityUtilizationQueueOrderingPolicy(true);
// Case 1, one queue
policy.setQueues(mockCSQueues(new String[] { "a" }, new int[] { 1 },
new float[] { 0.1f }, ""));
verifyOrder(policy, "", new String[] { "a" });
// Case 2, 2 queues, both under utilized, same priority
policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 },
new float[] { 0.2f, 0.1f }, ""));
verifyOrder(policy, "", new String[] { "b", "a" });
// Case 3, 2 queues, both over utilized, same priority
policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 },
new float[] { 1.1f, 1.2f }, ""));
verifyOrder(policy, "", new String[] { "a", "b" });
// Case 4, 2 queues, one under and one over, same priority
policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 },
new float[] { 0.1f, 1.2f }, ""));
verifyOrder(policy, "", new String[] { "a", "b" });
// Case 5, 2 queues, both over utilized, different priority
policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 },
new float[] { 1.1f, 1.2f }, ""));
verifyOrder(policy, "", new String[] { "b", "a" });
// Case 6, 2 queues, both under utilized, different priority
policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 },
new float[] { 0.1f, 0.2f }, ""));
verifyOrder(policy, "", new String[] { "b", "a" });
// Case 7, 2 queues, one under utilized and one over utilized,
// different priority (1)
policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 },
new float[] { 0.1f, 1.2f }, ""));
verifyOrder(policy, "", new String[] { "a", "b" });
// Case 8, 2 queues, one under utilized and one over utilized,
// different priority (1)
policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 2, 1 },
new float[] { 0.1f, 1.2f }, ""));
verifyOrder(policy, "", new String[] { "a", "b" });
// Case 9, 2 queues, one under utilized and one meet, different priority (1)
policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 },
new float[] { 0.1f, 1.0f }, ""));
verifyOrder(policy, "", new String[] { "a", "b" });
// Case 10, 2 queues, one under utilized and one meet, different priority (2)
policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 2, 1 },
new float[] { 0.1f, 1.0f }, ""));
verifyOrder(policy, "", new String[] { "a", "b" });
// Case 11, 2 queues, one under utilized and one meet, same priority
policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 },
new float[] { 0.1f, 1.0f }, ""));
verifyOrder(policy, "", new String[] { "a", "b" });
// Case 12, 2 queues, both meet, different priority
policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 },
new float[] { 1.0f, 1.0f }, ""));
verifyOrder(policy, "", new String[] { "b", "a" });
// Case 13, 5 queues, different priority
policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" },
new int[] { 1, 2, 0, 0, 3 },
new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, ""));
verifyOrder(policy, "", new String[] { "e", "c", "b", "a", "d" });
// Case 14, 5 queues, different priority, partition default;
policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" },
new int[] { 1, 2, 0, 0, 3 },
new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x"));
verifyOrder(policy, "", new String[] { "e", "b", "a", "c", "d" });
// Case 15, 5 queues, different priority, partition x;
policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" },
new int[] { 1, 2, 0, 0, 3 },
new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x"));
verifyOrder(policy, "x", new String[] { "e", "c", "b", "a", "d" });
// Case 16, 5 queues, different priority, partition x; and different
// accessibility
List<CSQueue> queues = mockCSQueues(new String[] { "a", "b", "c", "d", "e" },
new int[] { 1, 2, 0, 0, 3 },
new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x");
// Only a/d has access to x
when(queues.get(0).getAccessibleNodeLabels()).thenReturn(
ImmutableSet.of("x"));
when(queues.get(3).getAccessibleNodeLabels()).thenReturn(
ImmutableSet.of("x"));
policy.setQueues(queues);
verifyOrder(policy, "x", new String[] { "a", "d", "e", "c", "b" });
}
}

View File

@ -20,23 +20,14 @@
import java.util.*;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@ -157,7 +148,7 @@ public void testSizeBasedWeightNotAffectAppActivation() throws Exception {
// Define top-level queues
String queuePath = CapacitySchedulerConfiguration.ROOT + ".default";
csConf.setOrderingPolicy(queuePath, CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY);
csConf.setOrderingPolicy(queuePath, CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY);
csConf.setOrderingPolicyParameter(queuePath,
FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT, "true");
csConf.setMaximumApplicationMasterResourcePerQueuePercent(queuePath, 0.1f);