YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan.

(cherry picked from commit 487374b7fe)
This commit is contained in:
Jian He 2015-03-17 10:22:15 -07:00
parent 895588b439
commit 1c601e492f
13 changed files with 560 additions and 516 deletions

View File

@ -8,6 +8,9 @@ Release 2.8.0 - UNRELEASED
IMPROVEMENTS
YARN-3243. CapacityScheduler should pass headroom from parent to children
to make sure ParentQueue obey its capacity limits. (Wangda Tan via jianhe)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -20,10 +20,13 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@ -34,6 +37,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
@ -49,6 +53,7 @@
import com.google.common.collect.Sets;
public abstract class AbstractCSQueue implements CSQueue {
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
CSQueue parent;
final String queueName;
@ -406,21 +411,102 @@ private boolean isQueueHierarchyPreemptionDisabled(CSQueue q) {
parentQ.getPreemptionDisabled());
}
protected Resource getCurrentResourceLimit(Resource clusterResource,
ResourceLimits currentResourceLimits) {
private Resource getCurrentLimitResource(String nodeLabel,
Resource clusterResource, ResourceLimits currentResourceLimits) {
/*
* Queue's max available resource = min(my.max, my.limit)
* my.limit is set by my parent, considered used resource of my siblings
* Current limit resource: For labeled resource: limit = queue-max-resource
* (TODO, this part need update when we support labeled-limit) For
* non-labeled resource: limit = min(queue-max-resource,
* limit-set-by-parent)
*/
Resource queueMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource,
queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation);
Resource queueCurrentResourceLimit =
Resources.min(resourceCalculator, clusterResource, queueMaxResource,
currentResourceLimits.getLimit());
queueCurrentResourceLimit =
Resources.roundDown(resourceCalculator, queueCurrentResourceLimit,
minimumAllocation);
return queueCurrentResourceLimit;
Resources.multiplyAndNormalizeDown(resourceCalculator,
labelManager.getResourceByLabel(nodeLabel, clusterResource),
queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation);
if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) {
return Resources.min(resourceCalculator, clusterResource,
queueMaxResource, currentResourceLimits.getLimit());
}
return queueMaxResource;
}
synchronized boolean canAssignToThisQueue(Resource clusterResource,
Set<String> nodeLabels, ResourceLimits currentResourceLimits,
Resource nowRequired, Resource resourceCouldBeUnreserved) {
// Get label of this queue can access, it's (nodeLabel AND queueLabel)
Set<String> labelCanAccess;
if (null == nodeLabels || nodeLabels.isEmpty()) {
labelCanAccess = new HashSet<String>();
// Any queue can always access any node without label
labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
} else {
labelCanAccess = new HashSet<String>(
accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
: Sets.intersection(accessibleLabels, nodeLabels));
}
for (String label : labelCanAccess) {
// New total resource = used + required
Resource newTotalResource =
Resources.add(queueUsage.getUsed(label), nowRequired);
Resource currentLimitResource =
getCurrentLimitResource(label, clusterResource, currentResourceLimits);
// if reservation continous looking enabled, check to see if could we
// potentially use this node instead of a reserved node if the application
// has reserved containers.
// TODO, now only consider reservation cases when the node has no label
if (this.reservationsContinueLooking
&& label.equals(RMNodeLabelsManager.NO_LABEL)
&& Resources.greaterThan(resourceCalculator, clusterResource,
resourceCouldBeUnreserved, Resources.none())) {
// resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource =
Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
// when total-used-without-reserved-resource < currentLimit, we still
// have chance to allocate on this node by unreserving some containers
if (Resources.lessThan(resourceCalculator, clusterResource,
newTotalWithoutReservedResource, currentLimitResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: " + getQueueName()
+ " usedResources: " + queueUsage.getUsed()
+ ", clusterResources: " + clusterResource
+ ", reservedResources: " + resourceCouldBeUnreserved
+ ", capacity-without-reserved: "
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ currentLimitResource);
}
return true;
}
}
// Otherwise, if any of the label of this node beyond queue limit, we
// cannot allocate on this node. Consider a small epsilon here.
if (Resources.greaterThan(resourceCalculator, clusterResource,
newTotalResource, currentLimitResource)) {
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName()
+ "Check assign to queue, label=" + label
+ " usedResources: " + queueUsage.getUsed(label)
+ " clusterResources: " + clusterResource
+ " currentUsedCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(label),
labelManager.getResourceByLabel(label, clusterResource))
+ " max-capacity: "
+ queueCapacities.getAbsoluteMaximumCapacity(label)
+ ")");
}
return true;
}
// Actually, this will not happen, since labelCanAccess will be always
// non-empty
return false;
}
}

View File

@ -189,13 +189,11 @@ public void finishApplicationAttempt(FiCaSchedulerApp application,
* Assign containers to applications in the queue or it's children (if any).
* @param clusterResource the resource of the cluster.
* @param node node on which resources are available
* @param needToUnreserve assign container only if it can unreserve one first
* @param resourceLimits how much overall resource of this queue can use.
* @return the assignment
*/
public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, boolean needToUnreserve,
ResourceLimits resourceLimits);
FiCaSchedulerNode node, ResourceLimits resourceLimits);
/**
* A container assigned to the queue has completed.

View File

@ -1061,9 +1061,14 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
node.getNodeID());
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
CSAssignment assignment = queue.assignContainers(clusterResource, node,
false, new ResourceLimits(
clusterResource));
CSAssignment assignment =
queue.assignContainers(
clusterResource,
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)));
RMContainer excessReservation = assignment.getExcessReservation();
if (excessReservation != null) {
@ -1087,8 +1092,13 @@ false, new ResourceLimits(
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource());
}
root.assignContainers(clusterResource, node, false, new ResourceLimits(
clusterResource));
root.assignContainers(
clusterResource,
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)));
}
} else {
LOG.info("Skipping scheduling since node " + node.getNodeID() +
@ -1209,6 +1219,13 @@ private synchronized void addNode(RMNode nodeManager) {
usePortForNodeName, nodeManager.getNodeLabels());
this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
int numNodes = numNodeManagers.incrementAndGet();
@ -1220,12 +1237,6 @@ private synchronized void addNode(RMNode nodeManager) {
if (scheduleAsynchronously && numNodes == 1) {
asyncSchedulerThread.beginSchedule();
}
// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}
}
private synchronized void removeNode(RMNode nodeInfo) {

View File

@ -76,7 +76,6 @@
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
@Private
@Unstable
@ -157,7 +156,7 @@ protected synchronized void setupQueueConfigs(Resource clusterResource)
// and all queues may not be realized yet, we'll use (optimistic)
// absoluteMaxCapacity (it will be replaced with the more accurate
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
setQueueResourceLimitsInfo(clusterResource);
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
userLimit = conf.getUserLimit(getQueuePath());
@ -739,9 +738,8 @@ private static Set<String> getRequestLabelSetByExpression(
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, boolean needToUnreserve,
ResourceLimits currentResourceLimits) {
this.currentResourceLimits = currentResourceLimits;
FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
if(LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
@ -796,7 +794,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
continue;
}
if (!this.reservationsContinueLooking) {
if (!needContainers(application, priority, required)) {
if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
@ -818,8 +816,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
required, requestedNodeLabels);
// Check queue max-capacity limit
if (!canAssignToThisQueue(clusterResource, required,
node.getLabels(), application, true)) {
if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
this.currentResourceLimits, required, application.getCurrentReservation())) {
return NULL_ASSIGNMENT;
}
@ -835,7 +833,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
// Try to schedule
CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority,
null, needToUnreserve);
null);
// Did the application skip this node?
if (assignment.getSkipped()) {
@ -896,7 +894,7 @@ private synchronized CSAssignment assignReservedContainer(
// Try to assign if we have sufficient resources
assignContainersOnNode(clusterResource, node, application, priority,
rmContainer, false);
rmContainer);
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
@ -938,102 +936,14 @@ private Resource getHeadroom(User user, Resource currentResourceLimit,
Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
return headroom;
}
synchronized boolean canAssignToThisQueue(Resource clusterResource,
Resource required, Set<String> nodeLabels, FiCaSchedulerApp application,
boolean checkReservations) {
// Get label of this queue can access, it's (nodeLabel AND queueLabel)
Set<String> labelCanAccess;
if (null == nodeLabels || nodeLabels.isEmpty()) {
labelCanAccess = new HashSet<String>();
// Any queue can always access any node without label
labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
} else {
labelCanAccess = new HashSet<String>(Sets.intersection(accessibleLabels, nodeLabels));
}
boolean canAssign = true;
for (String label : labelCanAccess) {
Resource potentialTotalCapacity =
Resources.add(queueUsage.getUsed(label), required);
float potentialNewCapacity =
Resources.divide(resourceCalculator, clusterResource,
potentialTotalCapacity,
labelManager.getResourceByLabel(label, clusterResource));
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
// TODO, now only consider reservation cases when the node has no label
if (this.reservationsContinueLooking && checkReservations
&& label.equals(RMNodeLabelsManager.NO_LABEL)) {
float potentialNewWithoutReservedCapacity = Resources.divide(
resourceCalculator,
clusterResource,
Resources.subtract(potentialTotalCapacity,
application.getCurrentReservation()),
labelManager.getResourceByLabel(label, clusterResource));
if (potentialNewWithoutReservedCapacity <= queueCapacities
.getAbsoluteMaximumCapacity()) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: "
+ getQueueName()
+ " usedResources: "
+ queueUsage.getUsed()
+ " clusterResources: "
+ clusterResource
+ " reservedResources: "
+ application.getCurrentReservation()
+ " currentCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(), clusterResource) + " required " + required
+ " potentialNewWithoutReservedCapacity: "
+ potentialNewWithoutReservedCapacity + " ( "
+ " max-capacity: "
+ queueCapacities.getAbsoluteMaximumCapacity() + ")");
}
// we could potentially use this node instead of reserved node
return true;
}
}
// Otherwise, if any of the label of this node beyond queue limit, we
// cannot allocate on this node. Consider a small epsilon here.
if (potentialNewCapacity > queueCapacities
.getAbsoluteMaximumCapacity(label) + 1e-4) {
canAssign = false;
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName()
+ "Check assign to queue, label=" + label
+ " usedResources: " + queueUsage.getUsed(label)
+ " clusterResources: " + clusterResource
+ " currentCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(label),
labelManager.getResourceByLabel(label, clusterResource))
+ " potentialNewCapacity: " + potentialNewCapacity + " ( "
+ " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity()
+ ")");
}
}
return canAssign;
}
private Resource computeQueueCurrentLimitAndSetHeadroomInfo(
private void setQueueResourceLimitsInfo(
Resource clusterResource) {
Resource queueCurrentResourceLimit =
getCurrentResourceLimit(clusterResource, currentResourceLimits);
synchronized (queueResourceLimitsInfo) {
queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit);
queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits
.getLimit());
queueResourceLimitsInfo.setClusterResource(clusterResource);
}
return queueCurrentResourceLimit;
}
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
@ -1048,16 +958,16 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
computeUserLimit(application, clusterResource, required,
queueUser, requestedLabels);
Resource currentResourceLimit =
computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
setQueueResourceLimitsInfo(clusterResource);
Resource headroom =
getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit);
getHeadroom(queueUser, currentResourceLimits.getLimit(),
clusterResource, userLimit);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit +
" queueMaxAvailRes=" + currentResourceLimit +
" queueMaxAvailRes=" + currentResourceLimits.getLimit() +
" consumed=" + queueUser.getUsed() +
" headroom=" + headroom);
}
@ -1207,8 +1117,8 @@ protected synchronized boolean assignToUser(Resource clusterResource,
return true;
}
boolean needContainers(FiCaSchedulerApp application, Priority priority,
Resource required) {
boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application,
Priority priority, Resource required) {
int requiredContainers = application.getTotalRequiredResources(priority);
int reservedContainers = application.getNumReservedContainers(priority);
int starvation = 0;
@ -1240,7 +1150,7 @@ resourceCalculator, required, getMaximumAllocation()
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve) {
RMContainer reservedContainer) {
Resource assigned = Resources.none();
NodeType requestType = null;
@ -1252,7 +1162,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
requestType = NodeType.NODE_LOCAL;
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve,
node, application, priority, reservedContainer,
allocatedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
@ -1280,7 +1190,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve,
node, application, priority, reservedContainer,
allocatedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
@ -1308,7 +1218,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
assigned =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer, needToUnreserve,
node, application, priority, reservedContainer,
allocatedContainer);
// update locality statistics
@ -1320,13 +1230,24 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
return SKIP_ASSIGNMENT;
}
private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
// First we need to get minimum resource we need unreserve
// minimum-resource-need-unreserve = used + asked - limit
Resource minimumUnreservedResource =
Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource),
currentResourceLimits.getLimit());
return minimumUnreservedResource;
}
@Private
protected boolean findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
Resource capability) {
Resource askedResource, Resource minimumUnreservedResource) {
// need to unreserve some other container first
NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability);
NodeId idToUnreserve =
application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
resourceCalculator, clusterResource);
if (idToUnreserve == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("checked to see if could unreserve for app but nothing "
@ -1343,7 +1264,7 @@ protected boolean findNodeToUnreserve(Resource clusterResource,
LOG.debug("unreserving for app: " + application.getApplicationId()
+ " on nodeId: " + idToUnreserve
+ " in order to replace reserved application and place it on node: "
+ node.getNodeID() + " needing: " + capability);
+ node.getNodeID() + " needing: " + askedResource);
}
// headroom
@ -1364,15 +1285,7 @@ protected boolean findNodeToUnreserve(Resource clusterResource,
@Private
protected boolean checkLimitsToReserve(Resource clusterResource,
FiCaSchedulerApp application, Resource capability,
boolean needToUnreserve) {
if (needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("we needed to unreserve to be able to allocate");
}
return false;
}
FiCaSchedulerApp application, Resource capability) {
// we can't reserve if we got here based on the limit
// checks assuming we could unreserve!!!
Resource userLimit = computeUserLimitAndSetHeadroom(application,
@ -1380,7 +1293,8 @@ protected boolean checkLimitsToReserve(Resource clusterResource,
// Check queue max-capacity limit,
// TODO: Consider reservation on labels
if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) {
if (!canAssignToThisQueue(clusterResource, null,
this.currentResourceLimits, capability, Resources.none())) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit queue limit");
}
@ -1402,43 +1316,40 @@ protected boolean checkLimitsToReserve(Resource clusterResource,
private Resource assignNodeLocalContainers(Resource clusterResource,
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve,
MutableObject allocatedContainer) {
RMContainer reservedContainer, MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
needToUnreserve, allocatedContainer);
allocatedContainer);
}
return Resources.none();
}
private Resource assignRackLocalContainers(
Resource clusterResource, ResourceRequest rackLocalResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve,
MutableObject allocatedContainer) {
private Resource assignRackLocalContainers(Resource clusterResource,
ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
needToUnreserve, allocatedContainer);
allocatedContainer);
}
return Resources.none();
}
private Resource assignOffSwitchContainers(
Resource clusterResource, ResourceRequest offSwitchResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve,
MutableObject allocatedContainer) {
private Resource assignOffSwitchContainers(Resource clusterResource,
ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
needToUnreserve, allocatedContainer);
allocatedContainer);
}
return Resources.none();
@ -1522,13 +1433,12 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node,
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer,
boolean needToUnreserve, MutableObject createdContainer) {
MutableObject createdContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId()
+ " priority=" + priority.getPriority()
+ " request=" + request + " type=" + type
+ " needToUnreserve= " + needToUnreserve);
+ " request=" + request + " type=" + type);
}
// check if the resource request can access the label
@ -1548,12 +1458,14 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
Resource available = node.getAvailableResource();
Resource totalResource = node.getTotalResource();
if (!Resources.fitsIn(capability, totalResource)) {
if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource,
capability, totalResource)) {
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource());
return Resources.none();
}
assert Resources.greaterThan(
resourceCalculator, clusterResource, available, Resources.none());
@ -1566,18 +1478,9 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
LOG.warn("Couldn't get container for allocation!");
return Resources.none();
}
// default to true since if reservation continue look feature isn't on
// needContainers is checked earlier and we wouldn't have gotten this far
boolean canAllocContainer = true;
if (this.reservationsContinueLooking) {
// based on reservations can we allocate/reserve more or do we need
// to unreserve one first
canAllocContainer = needContainers(application, priority, capability);
if (LOG.isDebugEnabled()) {
LOG.debug("can alloc container is: " + canAllocContainer);
}
}
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
application, priority, capability);
// Can we allocate a container on this node?
int availableContainers =
@ -1588,25 +1491,25 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
// Did we previously reserve containers at this 'priority'?
if (rmContainer != null) {
unreserve(application, priority, node, rmContainer);
} else if (this.reservationsContinueLooking
&& (!canAllocContainer || needToUnreserve)) {
// need to unreserve some other container first
boolean res = findNodeToUnreserve(clusterResource, node, application,
priority, capability);
if (!res) {
return Resources.none();
}
} else {
// we got here by possibly ignoring queue capacity limits. If the
// parameter needToUnreserve is true it means we ignored one of those
// limits in the chance we could unreserve. If we are here we aren't
// trying to unreserve so we can't allocate anymore due to that parent
// limit.
if (needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("we needed to unreserve to be able to allocate, skipping");
} else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
// when reservationsContinueLooking is set, we may need to unreserve
// some containers to meet this queue and its parents' resource limits
// TODO, need change here when we want to support continuous reservation
// looking for labeled partitions.
Resource minimumUnreservedResource =
getMinimumResourceNeedUnreserved(capability);
if (!shouldAllocOrReserveNewContainer
|| Resources.greaterThan(resourceCalculator, clusterResource,
minimumUnreservedResource, Resources.none())) {
boolean containerUnreserved =
findNodeToUnreserve(clusterResource, node, application, priority,
capability, minimumUnreservedResource);
// When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
// container (That means we *have to* unreserve some resource to
// continue)). If we failed to unreserve some resource,
if (!containerUnreserved) {
return Resources.none();
}
return Resources.none();
}
}
@ -1632,17 +1535,16 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
} else {
// if we are allowed to allocate but this node doesn't have space, reserve it or
// if this was an already a reserved container, reserve it again
if ((canAllocContainer) || (rmContainer != null)) {
if (shouldAllocOrReserveNewContainer || rmContainer != null) {
if (reservationsContinueLooking) {
// we got here by possibly ignoring parent queue capacity limits. If
// the parameter needToUnreserve is true it means we ignored one of
// those limits in the chance we could unreserve. If we are here
// we aren't trying to unreserve so we can't allocate
// anymore due to that parent limit
boolean res = checkLimitsToReserve(clusterResource, application, capability,
needToUnreserve);
if (!res) {
if (reservationsContinueLooking && rmContainer == null) {
// we could possibly ignoring parent queue capacity limits when
// reservationsContinueLooking is set.
// If we're trying to reserve a container here, not container will be
// unreserved for reserving the new one. Check limits again before
// reserve the new container
if (!checkLimitsToReserve(clusterResource,
application, capability)) {
return Resources.none();
}
}
@ -1784,18 +1686,36 @@ private void updateAbsoluteCapacityResource(Resource clusterResource) {
Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
queueCapacities.getAbsoluteCapacity(), minimumAllocation);
}
private void updateCurrentResourceLimits(
ResourceLimits currentResourceLimits, Resource clusterResource) {
// TODO: need consider non-empty node labels when resource limits supports
// node labels
// Even if ParentQueue will set limits respect child's max queue capacity,
// but when allocating reserved container, CapacityScheduler doesn't do
// this. So need cap limits by queue's max capacity here.
this.currentResourceLimits = currentResourceLimits;
Resource queueMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
queueCapacities
.getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
minimumAllocation);
this.currentResourceLimits.setLimit(Resources.min(resourceCalculator,
clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
}
@Override
public synchronized void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
this.currentResourceLimits = currentResourceLimits;
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
lastClusterResource = clusterResource;
updateAbsoluteCapacityResource(clusterResource);
// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation
computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
setQueueResourceLimitsInfo(clusterResource);
// Update metrics
CSQueueUtils.updateQueueStatistics(

View File

@ -23,7 +23,6 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -48,7 +47,6 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -63,8 +61,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.Sets;
@Private
@Evolving
public class ParentQueue extends AbstractCSQueue {
@ -380,8 +376,7 @@ private synchronized void removeApplication(ApplicationId applicationId,
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, boolean needToUnreserve,
ResourceLimits resourceLimits) {
FiCaSchedulerNode node, ResourceLimits resourceLimits) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
Set<String> nodeLabels = node.getLabels();
@ -397,21 +392,18 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
+ getQueueName());
}
boolean localNeedToUnreserve = false;
// Are we over maximum-capacity for this queue?
if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
// check to see if we could if we unreserve first
localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
if (!localNeedToUnreserve) {
break;
}
// This will also consider parent's limits and also continuous reservation
// looking
if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
minimumAllocation, Resources.createResource(getMetrics()
.getReservedMB(), getMetrics().getReservedVirtualCores()))) {
break;
}
// Schedule
CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node,
localNeedToUnreserve | needToUnreserve, resourceLimits);
assignContainersToChildQueues(clusterResource, node, resourceLimits);
assignment.setType(assignedToChild.getType());
// Done if no child-queue assigned anything
@ -459,74 +451,6 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
return assignment;
}
private synchronized boolean canAssignToThisQueue(Resource clusterResource,
Set<String> nodeLabels) {
Set<String> labelCanAccess =
new HashSet<String>(
accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
: Sets.intersection(accessibleLabels, nodeLabels));
if (nodeLabels.isEmpty()) {
// Any queue can always access any node without label
labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
}
boolean canAssign = true;
for (String label : labelCanAccess) {
float currentAbsoluteLabelUsedCapacity =
Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(label),
labelManager.getResourceByLabel(label, clusterResource));
// if any of the label doesn't beyond limit, we can allocate on this node
if (currentAbsoluteLabelUsedCapacity >=
queueCapacities.getAbsoluteMaximumCapacity(label)) {
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() + " used=" + queueUsage.getUsed()
+ " current-capacity (" + queueUsage.getUsed(label) + ") "
+ " >= max-capacity ("
+ labelManager.getResourceByLabel(label, clusterResource) + ")");
}
canAssign = false;
break;
}
}
return canAssign;
}
private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) {
if (this.reservationsContinueLooking) {
// check to see if we could potentially use this node instead of a reserved
// node
Resource reservedResources = Resources.createResource(getMetrics()
.getReservedMB(), getMetrics().getReservedVirtualCores());
float capacityWithoutReservedCapacity = Resources.divide(
resourceCalculator, clusterResource,
Resources.subtract(queueUsage.getUsed(), reservedResources),
clusterResource);
if (capacityWithoutReservedCapacity <= queueCapacities
.getAbsoluteMaximumCapacity()) {
if (LOG.isDebugEnabled()) {
LOG.debug("parent: try to use reserved: " + getQueueName()
+ " usedResources: " + queueUsage.getUsed().getMemory()
+ " clusterResources: " + clusterResource.getMemory()
+ " reservedResources: " + reservedResources.getMemory()
+ " currentCapacity " + ((float) queueUsage.getUsed().getMemory())
/ clusterResource.getMemory()
+ " potentialNewWithoutReservedCapacity: "
+ capacityWithoutReservedCapacity + " ( " + " max-capacity: "
+ queueCapacities.getAbsoluteMaximumCapacity() + ")");
}
// we could potentially use this node instead of reserved node
return true;
}
}
return false;
}
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
return (node.getReservedContainer() == null) &&
Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
@ -534,28 +458,38 @@ private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
}
private ResourceLimits getResourceLimitsOfChild(CSQueue child,
Resource clusterResource, ResourceLimits myLimits) {
/*
* Set head-room of a given child, limit =
* min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used
* + child.used. To avoid any of this queue's and its ancestors' limit
* being violated
*/
Resource myCurrentLimit =
getCurrentResourceLimit(clusterResource, myLimits);
// My available resource = my-current-limit - my-used-resource
Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit,
getUsedResources());
// Child's limit = my-available-resource + resource-already-used-by-child
Resource clusterResource, ResourceLimits parentLimits) {
// Set resource-limit of a given child, child.limit =
// min(my.limit - my.used + child.used, child.max)
// Parent available resource = parent-limit - parent-used-resource
Resource parentMaxAvailableResource =
Resources.subtract(parentLimits.getLimit(), getUsedResources());
// Child's limit = parent-available-resource + child-used
Resource childLimit =
Resources.add(myMaxAvailableResource, child.getUsedResources());
Resources.add(parentMaxAvailableResource, child.getUsedResources());
// Get child's max resource
Resource childConfiguredMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
child.getAbsoluteMaximumCapacity(), minimumAllocation);
// Child's limit should be capped by child configured max resource
childLimit =
Resources.min(resourceCalculator, clusterResource, childLimit,
childConfiguredMaxResource);
// Normalize before return
childLimit =
Resources.roundDown(resourceCalculator, childLimit, minimumAllocation);
return new ResourceLimits(childLimit);
}
private synchronized CSAssignment assignContainersToChildQueues(
Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve,
ResourceLimits limits) {
Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
@ -573,9 +507,7 @@ private synchronized CSAssignment assignContainersToChildQueues(
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, limits);
assignment =
childQueue.assignContainers(cluster, node, needToUnreserve,
childLimits);
assignment = childQueue.assignContainers(cluster, node, childLimits);
if(LOG.isDebugEnabled()) {
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +

View File

@ -274,7 +274,8 @@ public synchronized Allocation getAllocation(ResourceCalculator rc,
}
synchronized public NodeId getNodeIdToUnreserve(Priority priority,
Resource capability) {
Resource resourceNeedUnreserve, ResourceCalculator rc,
Resource clusterResource) {
// first go around make this algorithm simple and just grab first
// reservation that has enough resources
@ -283,16 +284,19 @@ synchronized public NodeId getNodeIdToUnreserve(Priority priority,
if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
NodeId nodeId = entry.getKey();
Resource containerResource = entry.getValue().getContainer().getResource();
// make sure we unreserve one with at least the same amount of
// resources, otherwise could affect capacity limits
if (Resources.fitsIn(capability, entry.getValue().getContainer()
.getResource())) {
if (Resources.lessThanOrEqual(rc, clusterResource,
resourceNeedUnreserve, containerResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving node with reservation size: "
+ entry.getValue().getContainer().getResource()
+ " in order to allocate container with size: " + capability);
+ containerResource
+ " in order to allocate container with size: " + resourceNeedUnreserve);
}
return entry.getKey();
return nodeId;
}
}
}

View File

@ -611,7 +611,7 @@ public void testHeadroom() throws Exception {
app_0_0.updateResourceRequests(app_0_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@ -631,7 +631,7 @@ public void testHeadroom() throws Exception {
app_0_1.updateResourceRequests(app_0_1_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource)); // Schedule to compute
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
@ -651,7 +651,7 @@ public void testHeadroom() throws Exception {
app_1_0.updateResourceRequests(app_1_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource)); // Schedule to compute
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@ -660,7 +660,7 @@ public void testHeadroom() throws Exception {
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource)); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());

View File

@ -125,6 +125,7 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ComparisonFailure;
import org.junit.Test;
import org.mockito.Mockito;
@ -2483,6 +2484,64 @@ public void testHierarchyQueuesCurrentLimits() throws Exception {
Assert.assertEquals(30 * GB,
am1.doHeartbeat().getAvailableResources().getMemory());
}
@Test
public void testParentQueueMaxCapsAreRespected() throws Exception {
/*
* Queue tree:
* Root
* / \
* A B
* / \
* A1 A2
*/
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
csConf.setCapacity(A, 50);
csConf.setMaximumCapacity(A, 50);
csConf.setCapacity(B, 50);
// Define 2nd-level queues
csConf.setQueues(A, new String[] {"a1", "a2"});
csConf.setCapacity(A1, 50);
csConf.setUserLimitFactor(A1, 100.0f);
csConf.setCapacity(A2, 50);
csConf.setUserLimitFactor(A2, 100.0f);
csConf.setCapacity(B1, B1_CAPACITY);
csConf.setUserLimitFactor(B1, 100.0f);
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService());
nm1.registerNode();
// Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1);
// Try to launch app2 in a2, asked 2GB, should success
RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "a2");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
try {
// Try to allocate a container, a's usage=11G/max=12
// a1's usage=9G/max=12
// a2's usage=2G/max=12
// In this case, if a2 asked 2G, should fail.
waitContainerAllocated(am2, 2 * GB, 1, 2, rm1, nm1);
} catch (AssertionError failure) {
// Expected, return;
return;
}
Assert.fail("Shouldn't successfully allocate containers for am2, "
+ "queue-a's max capacity will be violated if container allocated");
}
private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,

View File

@ -20,7 +20,6 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@ -145,7 +144,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).
when(queue)
.assignContainers(eq(clusterResource), eq(node), anyBoolean(),
.assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
// Mock the node's resource availability
@ -157,7 +156,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
return new CSAssignment(allocatedResource, type);
}
}).
when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(),
when(queue).assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
doNothing().when(node).releaseContainer(any(Container.class));
}
@ -274,7 +273,7 @@ public void testSortedQueues() throws Exception {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
for(int i=0; i < 2; i++)
{
@ -282,7 +281,7 @@ public void testSortedQueues() throws Exception {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
for(int i=0; i < 3; i++)
@ -291,7 +290,7 @@ public void testSortedQueues() throws Exception {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
for(int i=0; i < 4; i++)
@ -300,7 +299,7 @@ public void testSortedQueues() throws Exception {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
verifyQueueMetrics(a, 1*GB, clusterResource);
@ -334,7 +333,7 @@ public void testSortedQueues() throws Exception {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
verifyQueueMetrics(a, 3*GB, clusterResource);
@ -362,7 +361,7 @@ public void testSortedQueues() throws Exception {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 3*GB, clusterResource);
@ -389,7 +388,7 @@ public void testSortedQueues() throws Exception {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -404,13 +403,13 @@ public void testSortedQueues() throws Exception {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
InOrder allocationOrder = inOrder(d,b);
allocationOrder.verify(d).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
any(FiCaSchedulerNode.class), any(ResourceLimits.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
any(FiCaSchedulerNode.class), any(ResourceLimits.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);

View File

@ -350,8 +350,8 @@ public void testSingleQueueOneUserMetrics() throws Exception {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0, false,
new ResourceLimits(clusterResource));
a.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
assertEquals(
(int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
a.getMetrics().getAvailableMB());
@ -486,7 +486,7 @@ public void testSingleQueueWithOneUser() throws Exception {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -497,7 +497,7 @@ public void testSingleQueueWithOneUser() throws Exception {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -506,7 +506,7 @@ public void testSingleQueueWithOneUser() throws Exception {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Can't allocate 3rd due to user-limit
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -516,7 +516,7 @@ public void testSingleQueueWithOneUser() throws Exception {
// Bump up user-limit-factor, now allocate should work
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@ -525,7 +525,7 @@ public void testSingleQueueWithOneUser() throws Exception {
assertEquals(3*GB, a.getMetrics().getAllocatedMB());
// One more should work, for app_1, due to user-limit-factor
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@ -536,8 +536,8 @@ public void testSingleQueueWithOneUser() throws Exception {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0, false,
new ResourceLimits(clusterResource));
a.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@ -652,21 +652,21 @@ public void testUserLimits() throws Exception {
// recordFactory)));
// 1 container to user_0
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
// One more to user_0 since he is the only active user
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -718,7 +718,7 @@ public void testComputeUserLimitAndSetHeadroom(){
assertEquals("There should only be 1 active user!",
1, qb.getActiveUsersManager().getNumActiveUsers());
//get headroom
qb.assignContainers(clusterResource, node_0, false,
qb.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
@ -738,7 +738,7 @@ public void testComputeUserLimitAndSetHeadroom(){
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
u1Priority, recordFactory)));
qb.submitApplicationAttempt(app_2, user_1);
qb.assignContainers(clusterResource, node_1, false,
qb.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
@ -781,9 +781,9 @@ public void testComputeUserLimitAndSetHeadroom(){
u1Priority, recordFactory)));
qb.submitApplicationAttempt(app_1, user_0);
qb.submitApplicationAttempt(app_3, user_1);
qb.assignContainers(clusterResource, node_0, false,
qb.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
qb.assignContainers(clusterResource, node_0, false,
qb.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
@ -802,7 +802,7 @@ public void testComputeUserLimitAndSetHeadroom(){
app_4.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
u0Priority, recordFactory)));
qb.assignContainers(clusterResource, node_1, false,
qb.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
@ -875,7 +875,7 @@ public void testUserHeadroomMultiApp() throws Exception {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory)));
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -892,7 +892,7 @@ public void testUserHeadroomMultiApp() throws Exception {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -981,7 +981,7 @@ public void testHeadroomWithMaxCap() throws Exception {
1, a.getActiveUsersManager().getNumActiveUsers());
// 1 container to user_0
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -992,7 +992,7 @@ public void testHeadroomWithMaxCap() throws Exception {
// the application is not yet active
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1009,7 +1009,7 @@ public void testHeadroomWithMaxCap() throws Exception {
// No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1023,7 +1023,7 @@ public void testHeadroomWithMaxCap() throws Exception {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
}
@ -1094,7 +1094,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
*/
// Only 1 container
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1102,7 +1102,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1110,7 +1110,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Can't allocate 3rd due to user-limit
a.setUserLimit(25);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1129,7 +1129,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Now allocations should goto app_2 since
// user_0 is at limit inspite of high user-limit-factor
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1139,7 +1139,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Now allocations should goto app_0 since
// user_0 is at user-limit not above it
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@ -1150,7 +1150,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@ -1162,7 +1162,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Now, allocations should goto app_3 since it's under user-limit
a.setMaxCapacity(1.0f);
a.setUserLimitFactor(1);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(7*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@ -1171,7 +1171,7 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
// Now we should assign to app_3 again since user_2 is under user-limit
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@ -1271,7 +1271,7 @@ public void testReservation() throws Exception {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1282,7 +1282,7 @@ public void testReservation() throws Exception {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1291,7 +1291,7 @@ public void testReservation() throws Exception {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1308,7 +1308,7 @@ public void testReservation() throws Exception {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1325,7 +1325,7 @@ public void testReservation() throws Exception {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -1393,7 +1393,7 @@ public void testStolenReservedContainer() throws Exception {
// Start testing...
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1403,7 +1403,7 @@ public void testStolenReservedContainer() throws Exception {
assertEquals(0*GB, a.getMetrics().getAvailableMB());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1417,7 +1417,7 @@ public void testStolenReservedContainer() throws Exception {
// We do not need locality delay here
doReturn(-1).when(a).getNodeLocalityDelay();
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(10*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1434,7 +1434,7 @@ public void testStolenReservedContainer() throws Exception {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -1503,7 +1503,7 @@ public void testReservationExchange() throws Exception {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1511,14 +1511,14 @@ public void testReservationExchange() throws Exception {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1533,7 +1533,7 @@ public void testReservationExchange() throws Exception {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1543,7 +1543,7 @@ public void testReservationExchange() throws Exception {
assertEquals(1, app_1.getReReservations(priority));
// Re-reserve
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1553,7 +1553,7 @@ public void testReservationExchange() throws Exception {
assertEquals(2, app_1.getReReservations(priority));
// Try to schedule on node_1 now, should *move* the reservation
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(9*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1571,7 +1571,7 @@ public void testReservationExchange() throws Exception {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
CSAssignment assignment = a.assignContainers(clusterResource, node_0, false,
CSAssignment assignment = a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -1643,7 +1643,7 @@ public void testLocalityScheduling() throws Exception {
CSAssignment assignment = null;
// Start with off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2, false,
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1652,7 +1652,7 @@ public void testLocalityScheduling() throws Exception {
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2, false,
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1661,7 +1661,7 @@ public void testLocalityScheduling() throws Exception {
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2, false,
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1671,7 +1671,7 @@ public void testLocalityScheduling() throws Exception {
// Another off switch, now we should allocate
// since missedOpportunities=3 and reqdContainers=3
assignment = a.assignContainers(clusterResource, node_2, false,
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1680,7 +1680,7 @@ public void testLocalityScheduling() throws Exception {
assertEquals(NodeType.OFF_SWITCH, assignment.getType());
// NODE_LOCAL - node_0
assignment = a.assignContainers(clusterResource, node_0, false,
assignment = a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1689,7 +1689,7 @@ public void testLocalityScheduling() throws Exception {
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
// NODE_LOCAL - node_1
assignment = a.assignContainers(clusterResource, node_1, false,
assignment = a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1718,14 +1718,14 @@ public void testLocalityScheduling() throws Exception {
doReturn(1).when(a).getNodeLocalityDelay();
// Shouldn't assign RACK_LOCAL yet
assignment = a.assignContainers(clusterResource, node_3, false,
assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(2, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Should assign RACK_LOCAL now
assignment = a.assignContainers(clusterResource, node_3, false,
assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1807,7 +1807,7 @@ public void testApplicationPriorityScheduling() throws Exception {
// Start with off switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@ -1820,7 +1820,7 @@ public void testApplicationPriorityScheduling() throws Exception {
// Another off-switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@ -1832,7 +1832,7 @@ public void testApplicationPriorityScheduling() throws Exception {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Another off-switch, shouldn't allocate OFF_SWITCH P1
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@ -1844,7 +1844,7 @@ public void testApplicationPriorityScheduling() throws Exception {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, DATA_LOCAL for P1
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@ -1856,7 +1856,7 @@ public void testApplicationPriorityScheduling() throws Exception {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, OFF_SWITCH for P2
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@ -1933,7 +1933,7 @@ public void testSchedulingConstraints() throws Exception {
app_0.updateResourceRequests(app_0_requests_0);
// NODE_LOCAL - node_0_1
a.assignContainers(clusterResource, node_0_0, false,
a.assignContainers(clusterResource, node_0_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1942,7 +1942,7 @@ public void testSchedulingConstraints() throws Exception {
// No allocation on node_1_0 even though it's node/rack local since
// required(ANY) == 0
a.assignContainers(clusterResource, node_1_0, false,
a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1959,7 +1959,7 @@ public void testSchedulingConstraints() throws Exception {
// No allocation on node_0_1 even though it's node/rack local since
// required(rack_1) == 0
a.assignContainers(clusterResource, node_0_1, false,
a.assignContainers(clusterResource, node_0_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1967,7 +1967,7 @@ public void testSchedulingConstraints() throws Exception {
assertEquals(1, app_0.getTotalRequiredResources(priority));
// NODE_LOCAL - node_1
a.assignContainers(clusterResource, node_1_0, false,
a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2220,7 +2220,7 @@ public void testLocalityConstraints() throws Exception {
// node_0_1
// Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
a.assignContainers(clusterResource, node_0_1, false,
a.assignContainers(clusterResource, node_0_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2243,7 +2243,7 @@ public void testLocalityConstraints() throws Exception {
// node_1_1
// Shouldn't allocate since RR(rack_1) = relax: false
a.assignContainers(clusterResource, node_1_1, false,
a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2274,7 +2274,7 @@ public void testLocalityConstraints() throws Exception {
// node_1_1
// Shouldn't allocate since node_1_1 is blacklisted
a.assignContainers(clusterResource, node_1_1, false,
a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2303,7 +2303,7 @@ public void testLocalityConstraints() throws Exception {
// node_1_1
// Shouldn't allocate since rack_1 is blacklisted
a.assignContainers(clusterResource, node_1_1, false,
a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2330,7 +2330,7 @@ public void testLocalityConstraints() throws Exception {
// Blacklist: < host_0_0 > <----
// Now, should allocate since RR(rack_1) = relax: true
a.assignContainers(clusterResource, node_1_1, false,
a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource));
verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2361,7 +2361,7 @@ public void testLocalityConstraints() throws Exception {
// host_1_0: 8G
// host_1_1: 7G
a.assignContainers(clusterResource, node_1_0, false,
a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2444,7 +2444,7 @@ public void testAllocateContainerOnNodeWithoutOffSwitchSpecified()
recordFactory)));
try {
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
} catch (NullPointerException e) {
Assert.fail("NPE when allocating container on node but "

View File

@ -156,7 +156,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
// Next call - nothing
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).when(queue)
.assignContainers(eq(clusterResource), eq(node), eq(false),
.assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
// Mock the node's resource availability
@ -167,8 +167,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
return new CSAssignment(allocatedResource, type);
}
}).
when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
}).when(queue).assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
}
@ -232,7 +231,7 @@ public void testSingleLevelQueues() throws Exception {
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
@ -240,13 +239,13 @@ public void testSingleLevelQueues() throws Exception {
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
stubQueueAllocation(a, clusterResource, node_1, 2*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1, false,
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -254,13 +253,13 @@ public void testSingleLevelQueues() throws Exception {
// since A has 2/6G while B has 2/14G
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
stubQueueAllocation(b, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -268,13 +267,13 @@ public void testSingleLevelQueues() throws Exception {
// since A has 3/6G while B has 4/14G
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 4*GB);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
@ -282,13 +281,13 @@ public void testSingleLevelQueues() throws Exception {
// since A has 3/6G while B has 8/14G
stubQueueAllocation(a, clusterResource, node_1, 1*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1, false,
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(a, b);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 4*GB, clusterResource);
verifyQueueMetrics(b, 9*GB, clusterResource);
}
@ -405,6 +404,22 @@ private void setupMultiLevelQueues(CapacitySchedulerConfiguration conf) {
@Test
public void testMultiLevelQueues() throws Exception {
/*
* Structure of queue:
* Root
* ____________
* / | \ \
* A B C D
* / | / | \ \
* A1 A2 B1 B2 B3 C1
* \
* C11
* \
* C111
* \
* C1111
*/
// Setup queue configs
setupMultiLevelQueues(csConf);
@ -449,7 +464,7 @@ public void testMultiLevelQueues() throws Exception {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 0*GB, clusterResource);
@ -462,7 +477,7 @@ public void testMultiLevelQueues() throws Exception {
stubQueueAllocation(a, clusterResource, node_1, 0*GB);
stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
stubQueueAllocation(c, clusterResource, node_1, 0*GB);
root.assignContainers(clusterResource, node_1, false,
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -474,15 +489,15 @@ public void testMultiLevelQueues() throws Exception {
stubQueueAllocation(a1, clusterResource, node_0, 1*GB);
stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
stubQueueAllocation(c, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
InOrder allocationOrder = inOrder(a, c, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 6*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -501,17 +516,17 @@ public void testMultiLevelQueues() throws Exception {
stubQueueAllocation(b3, clusterResource, node_2, 1*GB);
stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
stubQueueAllocation(c, clusterResource, node_2, 1*GB);
root.assignContainers(clusterResource, node_2, false,
root.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(a, a2, a1, b, c);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a2).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
verifyQueueMetrics(c, 4*GB, clusterResource);
@ -611,7 +626,7 @@ public void testOffSwitchScheduling() throws Exception {
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
@ -620,13 +635,13 @@ public void testOffSwitchScheduling() throws Exception {
// also, B gets a scheduling opportunity since A allocates RACK_LOCAL
stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1, false,
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -635,13 +650,13 @@ public void testOffSwitchScheduling() throws Exception {
// However, since B returns off-switch, A won't get an opportunity
stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -680,7 +695,7 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception {
// Simulate B3 returning a container on node_0
stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verifyQueueMetrics(b2, 0*GB, clusterResource);
verifyQueueMetrics(b3, 1*GB, clusterResource);
@ -689,13 +704,13 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception {
// also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL
stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1, false,
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
InOrder allocationOrder = inOrder(b2, b3);
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 2*GB, clusterResource);
@ -704,13 +719,13 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception {
// However, since B3 returns off-switch, B2 won't get an opportunity
stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(b3, b2);
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 3*GB, clusterResource);

View File

@ -265,7 +265,7 @@ public void testReservation() throws Exception {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@ -277,7 +277,7 @@ public void testReservation() throws Exception {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@ -289,7 +289,7 @@ public void testReservation() throws Exception {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -304,7 +304,7 @@ public void testReservation() throws Exception {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// try to assign reducer (5G on node 0 and should reserve)
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -320,7 +320,7 @@ public void testReservation() throws Exception {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// assign reducer to node 2
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@ -337,7 +337,7 @@ public void testReservation() throws Exception {
// node_1 heartbeat and unreserves from node_0 in order to allocate
// on node_1
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory());
@ -421,7 +421,7 @@ public void testReservationNoContinueLook() throws Exception {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@ -433,7 +433,7 @@ public void testReservationNoContinueLook() throws Exception {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@ -445,7 +445,7 @@ public void testReservationNoContinueLook() throws Exception {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -460,7 +460,7 @@ public void testReservationNoContinueLook() throws Exception {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// try to assign reducer (5G on node 0 and should reserve)
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -476,7 +476,7 @@ public void testReservationNoContinueLook() throws Exception {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// assign reducer to node 2
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@ -493,7 +493,7 @@ public void testReservationNoContinueLook() throws Exception {
// node_1 heartbeat and won't unreserve from node_0, potentially stuck
// if AM doesn't handle
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@ -569,7 +569,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@ -580,7 +580,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@ -591,7 +591,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -605,7 +605,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// try to assign reducer (5G on node 0 and should reserve)
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -620,7 +620,7 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// could allocate but told need to unreserve first
a.assignContainers(clusterResource, node_1, true,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@ -652,6 +652,8 @@ public void testGetAppToUnreserve() throws Exception {
String host_1 = "host_1";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
8 * GB);
Resource clusterResource = Resources.createResource(2 * 8 * GB);
// Setup resource-requests
Priority priorityMap = TestUtils.createMockPriority(5);
@ -681,23 +683,28 @@ public void testGetAppToUnreserve() throws Exception {
node_0.getNodeID(), "user", rmContext);
// no reserved containers
NodeId unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
NodeId unreserveId =
app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator(), clusterResource);
assertEquals(null, unreserveId);
// no reserved containers - reserve then unreserve
app_0.reserve(node_0, priorityMap, rmContainer_1, container_1);
app_0.unreserve(node_0, priorityMap);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator(), clusterResource);
assertEquals(null, unreserveId);
// no container large enough is reserved
app_0.reserve(node_0, priorityMap, rmContainer_1, container_1);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator(), clusterResource);
assertEquals(null, unreserveId);
// reserve one that is now large enough
app_0.reserve(node_1, priorityMap, rmContainer, container);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator(), clusterResource);
assertEquals(node_1.getNodeID(), unreserveId);
}
@ -741,14 +748,14 @@ public void testFindNodeToUnreserve() throws Exception {
// nothing reserved
boolean res = a.findNodeToUnreserve(csContext.getClusterResource(),
node_1, app_0, priorityMap, capability);
node_1, app_0, priorityMap, capability, capability);
assertFalse(res);
// reserved but scheduler doesn't know about that node.
app_0.reserve(node_1, priorityMap, rmContainer, container);
node_1.reserveResource(app_0, priorityMap, rmContainer);
res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0,
priorityMap, capability);
priorityMap, capability, capability);
assertFalse(res);
}
@ -815,7 +822,7 @@ public void testAssignToQueue() throws Exception {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@ -826,7 +833,7 @@ public void testAssignToQueue() throws Exception {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@ -837,7 +844,7 @@ public void testAssignToQueue() throws Exception {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -852,14 +859,15 @@ public void testAssignToQueue() throws Exception {
// absoluteMaxCapacity
Resource capability = Resources.createResource(32 * GB, 0);
boolean res =
a.canAssignToThisQueue(clusterResource, capability,
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
a.canAssignToThisQueue(clusterResource,
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
clusterResource), capability, Resources.none());
assertFalse(res);
// now add in reservations and make sure it continues if config set
// allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -872,14 +880,17 @@ public void testAssignToQueue() throws Exception {
capability = Resources.createResource(5 * GB, 0);
res =
a.canAssignToThisQueue(clusterResource, capability,
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
a.canAssignToThisQueue(clusterResource,
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
clusterResource), capability, Resources
.createResource(5 * GB));
assertTrue(res);
// tell to not check reservations
res =
a.canAssignToThisQueue(clusterResource, capability,
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, false);
a.canAssignToThisQueue(clusterResource,
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
clusterResource), capability, Resources.none());
assertFalse(res);
refreshQueuesTurnOffReservationsContLook(a, csConf);
@ -887,13 +898,16 @@ public void testAssignToQueue() throws Exception {
// should return false no matter what checkReservations is passed
// in since feature is off
res =
a.canAssignToThisQueue(clusterResource, capability,
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, false);
a.canAssignToThisQueue(clusterResource,
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
clusterResource), capability, Resources.none());
assertFalse(res);
res =
a.canAssignToThisQueue(clusterResource, capability,
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
a.canAssignToThisQueue(clusterResource,
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
clusterResource), capability, Resources
.createResource(5 * GB));
assertFalse(res);
}
@ -984,16 +998,16 @@ public void testAssignToUser() throws Exception {
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
priorityAM, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true,
priorityReduce, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true,
priorityMap, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true,
priorityReduce, recordFactory)));
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@ -1004,7 +1018,7 @@ public void testAssignToUser() throws Exception {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@ -1015,7 +1029,7 @@ public void testAssignToUser() throws Exception {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -1029,7 +1043,7 @@ public void testAssignToUser() throws Exception {
// now add in reservations and make sure it continues if config set
// allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -1116,19 +1130,19 @@ public void testReservationsNoneAvailable() throws Exception {
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
priorityAM, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true,
priorityReduce, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true,
priorityMap, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true,
priorityReduce, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true,
priorityLast, recordFactory)));
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@ -1140,7 +1154,7 @@ public void testReservationsNoneAvailable() throws Exception {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@ -1152,7 +1166,7 @@ public void testReservationsNoneAvailable() throws Exception {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -1164,38 +1178,41 @@ public void testReservationsNoneAvailable() throws Exception {
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// try to assign reducer (5G on node 0), but tell it
// it has to unreserve. No room to allocate and shouldn't reserve
// since nothing currently reserved.
a.assignContainers(clusterResource, node_0, true,
new ResourceLimits(clusterResource));
// try to assign reducer (5G on node 0), but tell it's resource limits <
// used (8G) + required (5G). It will not reserved since it has to unreserve
// some resource. Even with continous reservation looking, we don't allow
// unreserve resource to reserve container.
a.assignContainers(clusterResource, node_0,
new ResourceLimits(Resources.createResource(10 * GB)));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
assertEquals(8 * GB, a.getMetrics().getAllocatedMB());
assertEquals(16 * GB, a.getMetrics().getAvailableMB());
assertEquals(16 * GB, app_0.getHeadroom().getMemory());
// app_0's headroom = limit (10G) - used (8G) = 2G
assertEquals(2 * GB, app_0.getHeadroom().getMemory());
assertEquals(5 * GB, node_0.getUsedResource().getMemory());
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// try to assign reducer (5G on node 2), but tell it
// it has to unreserve. Has room but shouldn't reserve
// since nothing currently reserved.
a.assignContainers(clusterResource, node_2, true,
new ResourceLimits(clusterResource));
// try to assign reducer (5G on node 0), but tell it's resource limits <
// used (8G) + required (5G). It will not reserved since it has to unreserve
// some resource. Unfortunately, there's nothing to unreserve.
a.assignContainers(clusterResource, node_2,
new ResourceLimits(Resources.createResource(10 * GB)));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
assertEquals(8 * GB, a.getMetrics().getAllocatedMB());
assertEquals(16 * GB, a.getMetrics().getAvailableMB());
assertEquals(16 * GB, app_0.getHeadroom().getMemory());
// app_0's headroom = limit (10G) - used (8G) = 2G
assertEquals(2 * GB, app_0.getHeadroom().getMemory());
assertEquals(5 * GB, node_0.getUsedResource().getMemory());
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// let it assign 5G to node_2
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@ -1208,7 +1225,7 @@ public void testReservationsNoneAvailable() throws Exception {
assertEquals(5 * GB, node_2.getUsedResource().getMemory());
// reserve 8G node_0
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(21 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@ -1223,7 +1240,7 @@ public void testReservationsNoneAvailable() throws Exception {
// try to assign (8G on node 2). No room to allocate,
// continued to try due to having reservation above,
// but hits queue limits so can't reserve anymore.
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
assertEquals(21 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());