YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan.

(cherry picked from commit 487374b7fe)

(cherry picked from commit 1c601e492f)
This commit is contained in:
Jian He 2015-03-17 10:22:15 -07:00 committed by Wangda Tan
parent 04dd05989e
commit a38dbbc5b1
13 changed files with 560 additions and 516 deletions

View File

@ -8,6 +8,9 @@ Release 2.7.1 - UNRELEASED
IMPROVEMENTS
YARN-3243. CapacityScheduler should pass headroom from parent to children
to make sure ParentQueue obey its capacity limits. (Wangda Tan via jianhe)
OPTIMIZATIONS
BUG FIXES

View File

@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.Sets;
public abstract class AbstractCSQueue implements CSQueue {
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
CSQueue parent;
final String queueName;
@ -406,21 +411,102 @@ public abstract class AbstractCSQueue implements CSQueue {
parentQ.getPreemptionDisabled());
}
protected Resource getCurrentResourceLimit(Resource clusterResource,
ResourceLimits currentResourceLimits) {
private Resource getCurrentLimitResource(String nodeLabel,
Resource clusterResource, ResourceLimits currentResourceLimits) {
/*
* Queue's max available resource = min(my.max, my.limit)
* my.limit is set by my parent, considered used resource of my siblings
* Current limit resource: For labeled resource: limit = queue-max-resource
* (TODO, this part need update when we support labeled-limit) For
* non-labeled resource: limit = min(queue-max-resource,
* limit-set-by-parent)
*/
Resource queueMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource,
queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation);
Resource queueCurrentResourceLimit =
Resources.min(resourceCalculator, clusterResource, queueMaxResource,
currentResourceLimits.getLimit());
queueCurrentResourceLimit =
Resources.roundDown(resourceCalculator, queueCurrentResourceLimit,
minimumAllocation);
return queueCurrentResourceLimit;
Resources.multiplyAndNormalizeDown(resourceCalculator,
labelManager.getResourceByLabel(nodeLabel, clusterResource),
queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation);
if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) {
return Resources.min(resourceCalculator, clusterResource,
queueMaxResource, currentResourceLimits.getLimit());
}
return queueMaxResource;
}
synchronized boolean canAssignToThisQueue(Resource clusterResource,
Set<String> nodeLabels, ResourceLimits currentResourceLimits,
Resource nowRequired, Resource resourceCouldBeUnreserved) {
// Get label of this queue can access, it's (nodeLabel AND queueLabel)
Set<String> labelCanAccess;
if (null == nodeLabels || nodeLabels.isEmpty()) {
labelCanAccess = new HashSet<String>();
// Any queue can always access any node without label
labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
} else {
labelCanAccess = new HashSet<String>(
accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
: Sets.intersection(accessibleLabels, nodeLabels));
}
for (String label : labelCanAccess) {
// New total resource = used + required
Resource newTotalResource =
Resources.add(queueUsage.getUsed(label), nowRequired);
Resource currentLimitResource =
getCurrentLimitResource(label, clusterResource, currentResourceLimits);
// if reservation continous looking enabled, check to see if could we
// potentially use this node instead of a reserved node if the application
// has reserved containers.
// TODO, now only consider reservation cases when the node has no label
if (this.reservationsContinueLooking
&& label.equals(RMNodeLabelsManager.NO_LABEL)
&& Resources.greaterThan(resourceCalculator, clusterResource,
resourceCouldBeUnreserved, Resources.none())) {
// resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource =
Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
// when total-used-without-reserved-resource < currentLimit, we still
// have chance to allocate on this node by unreserving some containers
if (Resources.lessThan(resourceCalculator, clusterResource,
newTotalWithoutReservedResource, currentLimitResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: " + getQueueName()
+ " usedResources: " + queueUsage.getUsed()
+ ", clusterResources: " + clusterResource
+ ", reservedResources: " + resourceCouldBeUnreserved
+ ", capacity-without-reserved: "
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ currentLimitResource);
}
return true;
}
}
// Otherwise, if any of the label of this node beyond queue limit, we
// cannot allocate on this node. Consider a small epsilon here.
if (Resources.greaterThan(resourceCalculator, clusterResource,
newTotalResource, currentLimitResource)) {
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName()
+ "Check assign to queue, label=" + label
+ " usedResources: " + queueUsage.getUsed(label)
+ " clusterResources: " + clusterResource
+ " currentUsedCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(label),
labelManager.getResourceByLabel(label, clusterResource))
+ " max-capacity: "
+ queueCapacities.getAbsoluteMaximumCapacity(label)
+ ")");
}
return true;
}
// Actually, this will not happen, since labelCanAccess will be always
// non-empty
return false;
}
}

View File

@ -189,13 +189,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* Assign containers to applications in the queue or it's children (if any).
* @param clusterResource the resource of the cluster.
* @param node node on which resources are available
* @param needToUnreserve assign container only if it can unreserve one first
* @param resourceLimits how much overall resource of this queue can use.
* @return the assignment
*/
public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, boolean needToUnreserve,
ResourceLimits resourceLimits);
FiCaSchedulerNode node, ResourceLimits resourceLimits);
/**
* A container assigned to the queue has completed.

View File

@ -1057,9 +1057,14 @@ public class CapacityScheduler extends
node.getNodeID());
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
CSAssignment assignment = queue.assignContainers(clusterResource, node,
false, new ResourceLimits(
clusterResource));
CSAssignment assignment =
queue.assignContainers(
clusterResource,
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)));
RMContainer excessReservation = assignment.getExcessReservation();
if (excessReservation != null) {
@ -1083,8 +1088,13 @@ public class CapacityScheduler extends
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource());
}
root.assignContainers(clusterResource, node, false, new ResourceLimits(
clusterResource));
root.assignContainers(
clusterResource,
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)));
}
} else {
LOG.info("Skipping scheduling since node " + node.getNodeID() +
@ -1205,6 +1215,13 @@ public class CapacityScheduler extends
usePortForNodeName, nodeManager.getNodeLabels());
this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
int numNodes = numNodeManagers.incrementAndGet();
@ -1216,12 +1233,6 @@ public class CapacityScheduler extends
if (scheduleAsynchronously && numNodes == 1) {
asyncSchedulerThread.beginSchedule();
}
// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}
}
private synchronized void removeNode(RMNode nodeInfo) {

View File

@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
@Private
@Unstable
@ -157,7 +156,7 @@ public class LeafQueue extends AbstractCSQueue {
// and all queues may not be realized yet, we'll use (optimistic)
// absoluteMaxCapacity (it will be replaced with the more accurate
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
setQueueResourceLimitsInfo(clusterResource);
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
userLimit = conf.getUserLimit(getQueuePath());
@ -742,9 +741,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, boolean needToUnreserve,
ResourceLimits currentResourceLimits) {
this.currentResourceLimits = currentResourceLimits;
FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
if(LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
@ -799,7 +797,7 @@ public class LeafQueue extends AbstractCSQueue {
continue;
}
if (!this.reservationsContinueLooking) {
if (!needContainers(application, priority, required)) {
if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
@ -821,8 +819,8 @@ public class LeafQueue extends AbstractCSQueue {
required, requestedNodeLabels);
// Check queue max-capacity limit
if (!canAssignToThisQueue(clusterResource, required,
node.getLabels(), application, true)) {
if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
this.currentResourceLimits, required, application.getCurrentReservation())) {
return NULL_ASSIGNMENT;
}
@ -838,7 +836,7 @@ public class LeafQueue extends AbstractCSQueue {
// Try to schedule
CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority,
null, needToUnreserve);
null);
// Did the application skip this node?
if (assignment.getSkipped()) {
@ -899,7 +897,7 @@ public class LeafQueue extends AbstractCSQueue {
// Try to assign if we have sufficient resources
assignContainersOnNode(clusterResource, node, application, priority,
rmContainer, false);
rmContainer);
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
@ -941,102 +939,14 @@ public class LeafQueue extends AbstractCSQueue {
Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
return headroom;
}
synchronized boolean canAssignToThisQueue(Resource clusterResource,
Resource required, Set<String> nodeLabels, FiCaSchedulerApp application,
boolean checkReservations) {
// Get label of this queue can access, it's (nodeLabel AND queueLabel)
Set<String> labelCanAccess;
if (null == nodeLabels || nodeLabels.isEmpty()) {
labelCanAccess = new HashSet<String>();
// Any queue can always access any node without label
labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
} else {
labelCanAccess = new HashSet<String>(Sets.intersection(accessibleLabels, nodeLabels));
}
boolean canAssign = true;
for (String label : labelCanAccess) {
Resource potentialTotalCapacity =
Resources.add(queueUsage.getUsed(label), required);
float potentialNewCapacity =
Resources.divide(resourceCalculator, clusterResource,
potentialTotalCapacity,
labelManager.getResourceByLabel(label, clusterResource));
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
// TODO, now only consider reservation cases when the node has no label
if (this.reservationsContinueLooking && checkReservations
&& label.equals(RMNodeLabelsManager.NO_LABEL)) {
float potentialNewWithoutReservedCapacity = Resources.divide(
resourceCalculator,
clusterResource,
Resources.subtract(potentialTotalCapacity,
application.getCurrentReservation()),
labelManager.getResourceByLabel(label, clusterResource));
if (potentialNewWithoutReservedCapacity <= queueCapacities
.getAbsoluteMaximumCapacity()) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: "
+ getQueueName()
+ " usedResources: "
+ queueUsage.getUsed()
+ " clusterResources: "
+ clusterResource
+ " reservedResources: "
+ application.getCurrentReservation()
+ " currentCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(), clusterResource) + " required " + required
+ " potentialNewWithoutReservedCapacity: "
+ potentialNewWithoutReservedCapacity + " ( "
+ " max-capacity: "
+ queueCapacities.getAbsoluteMaximumCapacity() + ")");
}
// we could potentially use this node instead of reserved node
return true;
}
}
// Otherwise, if any of the label of this node beyond queue limit, we
// cannot allocate on this node. Consider a small epsilon here.
if (potentialNewCapacity > queueCapacities
.getAbsoluteMaximumCapacity(label) + 1e-4) {
canAssign = false;
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName()
+ "Check assign to queue, label=" + label
+ " usedResources: " + queueUsage.getUsed(label)
+ " clusterResources: " + clusterResource
+ " currentCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(label),
labelManager.getResourceByLabel(label, clusterResource))
+ " potentialNewCapacity: " + potentialNewCapacity + " ( "
+ " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity()
+ ")");
}
}
return canAssign;
}
private Resource computeQueueCurrentLimitAndSetHeadroomInfo(
private void setQueueResourceLimitsInfo(
Resource clusterResource) {
Resource queueCurrentResourceLimit =
getCurrentResourceLimit(clusterResource, currentResourceLimits);
synchronized (queueResourceLimitsInfo) {
queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit);
queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits
.getLimit());
queueResourceLimitsInfo.setClusterResource(clusterResource);
}
return queueCurrentResourceLimit;
}
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
@ -1051,16 +961,16 @@ public class LeafQueue extends AbstractCSQueue {
computeUserLimit(application, clusterResource, required,
queueUser, requestedLabels);
Resource currentResourceLimit =
computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
setQueueResourceLimitsInfo(clusterResource);
Resource headroom =
getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit);
getHeadroom(queueUser, currentResourceLimits.getLimit(),
clusterResource, userLimit);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit +
" queueMaxAvailRes=" + currentResourceLimit +
" queueMaxAvailRes=" + currentResourceLimits.getLimit() +
" consumed=" + queueUser.getUsed() +
" headroom=" + headroom);
}
@ -1210,8 +1120,8 @@ public class LeafQueue extends AbstractCSQueue {
return true;
}
boolean needContainers(FiCaSchedulerApp application, Priority priority,
Resource required) {
boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application,
Priority priority, Resource required) {
int requiredContainers = application.getTotalRequiredResources(priority);
int reservedContainers = application.getNumReservedContainers(priority);
int starvation = 0;
@ -1243,7 +1153,7 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve) {
RMContainer reservedContainer) {
Resource assigned = Resources.none();
NodeType requestType = null;
@ -1255,7 +1165,7 @@ public class LeafQueue extends AbstractCSQueue {
requestType = NodeType.NODE_LOCAL;
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve,
node, application, priority, reservedContainer,
allocatedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
@ -1283,7 +1193,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve,
node, application, priority, reservedContainer,
allocatedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
@ -1311,7 +1221,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer, needToUnreserve,
node, application, priority, reservedContainer,
allocatedContainer);
// update locality statistics
@ -1323,13 +1233,24 @@ public class LeafQueue extends AbstractCSQueue {
return SKIP_ASSIGNMENT;
}
private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
// First we need to get minimum resource we need unreserve
// minimum-resource-need-unreserve = used + asked - limit
Resource minimumUnreservedResource =
Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource),
currentResourceLimits.getLimit());
return minimumUnreservedResource;
}
@Private
protected boolean findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
Resource capability) {
Resource askedResource, Resource minimumUnreservedResource) {
// need to unreserve some other container first
NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability);
NodeId idToUnreserve =
application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
resourceCalculator, clusterResource);
if (idToUnreserve == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("checked to see if could unreserve for app but nothing "
@ -1346,7 +1267,7 @@ public class LeafQueue extends AbstractCSQueue {
LOG.debug("unreserving for app: " + application.getApplicationId()
+ " on nodeId: " + idToUnreserve
+ " in order to replace reserved application and place it on node: "
+ node.getNodeID() + " needing: " + capability);
+ node.getNodeID() + " needing: " + askedResource);
}
// headroom
@ -1367,15 +1288,7 @@ public class LeafQueue extends AbstractCSQueue {
@Private
protected boolean checkLimitsToReserve(Resource clusterResource,
FiCaSchedulerApp application, Resource capability,
boolean needToUnreserve) {
if (needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("we needed to unreserve to be able to allocate");
}
return false;
}
FiCaSchedulerApp application, Resource capability) {
// we can't reserve if we got here based on the limit
// checks assuming we could unreserve!!!
Resource userLimit = computeUserLimitAndSetHeadroom(application,
@ -1383,7 +1296,8 @@ public class LeafQueue extends AbstractCSQueue {
// Check queue max-capacity limit,
// TODO: Consider reservation on labels
if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) {
if (!canAssignToThisQueue(clusterResource, null,
this.currentResourceLimits, capability, Resources.none())) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit queue limit");
}
@ -1405,43 +1319,40 @@ public class LeafQueue extends AbstractCSQueue {
private Resource assignNodeLocalContainers(Resource clusterResource,
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve,
MutableObject allocatedContainer) {
RMContainer reservedContainer, MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
needToUnreserve, allocatedContainer);
allocatedContainer);
}
return Resources.none();
}
private Resource assignRackLocalContainers(
Resource clusterResource, ResourceRequest rackLocalResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve,
MutableObject allocatedContainer) {
private Resource assignRackLocalContainers(Resource clusterResource,
ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
needToUnreserve, allocatedContainer);
allocatedContainer);
}
return Resources.none();
}
private Resource assignOffSwitchContainers(
Resource clusterResource, ResourceRequest offSwitchResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve,
MutableObject allocatedContainer) {
private Resource assignOffSwitchContainers(Resource clusterResource,
ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
needToUnreserve, allocatedContainer);
allocatedContainer);
}
return Resources.none();
@ -1525,13 +1436,12 @@ public class LeafQueue extends AbstractCSQueue {
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer,
boolean needToUnreserve, MutableObject createdContainer) {
MutableObject createdContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId()
+ " priority=" + priority.getPriority()
+ " request=" + request + " type=" + type
+ " needToUnreserve= " + needToUnreserve);
+ " request=" + request + " type=" + type);
}
// check if the resource request can access the label
@ -1551,12 +1461,14 @@ public class LeafQueue extends AbstractCSQueue {
Resource available = node.getAvailableResource();
Resource totalResource = node.getTotalResource();
if (!Resources.fitsIn(capability, totalResource)) {
if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource,
capability, totalResource)) {
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource());
return Resources.none();
}
assert Resources.greaterThan(
resourceCalculator, clusterResource, available, Resources.none());
@ -1569,18 +1481,9 @@ public class LeafQueue extends AbstractCSQueue {
LOG.warn("Couldn't get container for allocation!");
return Resources.none();
}
// default to true since if reservation continue look feature isn't on
// needContainers is checked earlier and we wouldn't have gotten this far
boolean canAllocContainer = true;
if (this.reservationsContinueLooking) {
// based on reservations can we allocate/reserve more or do we need
// to unreserve one first
canAllocContainer = needContainers(application, priority, capability);
if (LOG.isDebugEnabled()) {
LOG.debug("can alloc container is: " + canAllocContainer);
}
}
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
application, priority, capability);
// Can we allocate a container on this node?
int availableContainers =
@ -1591,25 +1494,25 @@ public class LeafQueue extends AbstractCSQueue {
// Did we previously reserve containers at this 'priority'?
if (rmContainer != null) {
unreserve(application, priority, node, rmContainer);
} else if (this.reservationsContinueLooking
&& (!canAllocContainer || needToUnreserve)) {
// need to unreserve some other container first
boolean res = findNodeToUnreserve(clusterResource, node, application,
priority, capability);
if (!res) {
return Resources.none();
}
} else {
// we got here by possibly ignoring queue capacity limits. If the
// parameter needToUnreserve is true it means we ignored one of those
// limits in the chance we could unreserve. If we are here we aren't
// trying to unreserve so we can't allocate anymore due to that parent
// limit.
if (needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("we needed to unreserve to be able to allocate, skipping");
} else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
// when reservationsContinueLooking is set, we may need to unreserve
// some containers to meet this queue and its parents' resource limits
// TODO, need change here when we want to support continuous reservation
// looking for labeled partitions.
Resource minimumUnreservedResource =
getMinimumResourceNeedUnreserved(capability);
if (!shouldAllocOrReserveNewContainer
|| Resources.greaterThan(resourceCalculator, clusterResource,
minimumUnreservedResource, Resources.none())) {
boolean containerUnreserved =
findNodeToUnreserve(clusterResource, node, application, priority,
capability, minimumUnreservedResource);
// When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
// container (That means we *have to* unreserve some resource to
// continue)). If we failed to unreserve some resource,
if (!containerUnreserved) {
return Resources.none();
}
return Resources.none();
}
}
@ -1635,17 +1538,16 @@ public class LeafQueue extends AbstractCSQueue {
} else {
// if we are allowed to allocate but this node doesn't have space, reserve it or
// if this was an already a reserved container, reserve it again
if ((canAllocContainer) || (rmContainer != null)) {
if (shouldAllocOrReserveNewContainer || rmContainer != null) {
if (reservationsContinueLooking) {
// we got here by possibly ignoring parent queue capacity limits. If
// the parameter needToUnreserve is true it means we ignored one of
// those limits in the chance we could unreserve. If we are here
// we aren't trying to unreserve so we can't allocate
// anymore due to that parent limit
boolean res = checkLimitsToReserve(clusterResource, application, capability,
needToUnreserve);
if (!res) {
if (reservationsContinueLooking && rmContainer == null) {
// we could possibly ignoring parent queue capacity limits when
// reservationsContinueLooking is set.
// If we're trying to reserve a container here, not container will be
// unreserved for reserving the new one. Check limits again before
// reserve the new container
if (!checkLimitsToReserve(clusterResource,
application, capability)) {
return Resources.none();
}
}
@ -1787,18 +1689,36 @@ public class LeafQueue extends AbstractCSQueue {
Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
queueCapacities.getAbsoluteCapacity(), minimumAllocation);
}
private void updateCurrentResourceLimits(
ResourceLimits currentResourceLimits, Resource clusterResource) {
// TODO: need consider non-empty node labels when resource limits supports
// node labels
// Even if ParentQueue will set limits respect child's max queue capacity,
// but when allocating reserved container, CapacityScheduler doesn't do
// this. So need cap limits by queue's max capacity here.
this.currentResourceLimits = currentResourceLimits;
Resource queueMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
queueCapacities
.getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
minimumAllocation);
this.currentResourceLimits.setLimit(Resources.min(resourceCalculator,
clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
}
@Override
public synchronized void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
this.currentResourceLimits = currentResourceLimits;
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
lastClusterResource = clusterResource;
updateAbsoluteCapacityResource(clusterResource);
// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation
computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
setQueueResourceLimitsInfo(clusterResource);
// Update metrics
CSQueueUtils.updateQueueStatistics(

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -63,8 +61,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.Sets;
@Private
@Evolving
public class ParentQueue extends AbstractCSQueue {
@ -380,8 +376,7 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, boolean needToUnreserve,
ResourceLimits resourceLimits) {
FiCaSchedulerNode node, ResourceLimits resourceLimits) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
Set<String> nodeLabels = node.getLabels();
@ -397,21 +392,18 @@ public class ParentQueue extends AbstractCSQueue {
+ getQueueName());
}
boolean localNeedToUnreserve = false;
// Are we over maximum-capacity for this queue?
if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
// check to see if we could if we unreserve first
localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
if (!localNeedToUnreserve) {
break;
}
// This will also consider parent's limits and also continuous reservation
// looking
if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
minimumAllocation, Resources.createResource(getMetrics()
.getReservedMB(), getMetrics().getReservedVirtualCores()))) {
break;
}
// Schedule
CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node,
localNeedToUnreserve | needToUnreserve, resourceLimits);
assignContainersToChildQueues(clusterResource, node, resourceLimits);
assignment.setType(assignedToChild.getType());
// Done if no child-queue assigned anything
@ -459,74 +451,6 @@ public class ParentQueue extends AbstractCSQueue {
return assignment;
}
private synchronized boolean canAssignToThisQueue(Resource clusterResource,
Set<String> nodeLabels) {
Set<String> labelCanAccess =
new HashSet<String>(
accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
: Sets.intersection(accessibleLabels, nodeLabels));
if (nodeLabels.isEmpty()) {
// Any queue can always access any node without label
labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
}
boolean canAssign = true;
for (String label : labelCanAccess) {
float currentAbsoluteLabelUsedCapacity =
Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(label),
labelManager.getResourceByLabel(label, clusterResource));
// if any of the label doesn't beyond limit, we can allocate on this node
if (currentAbsoluteLabelUsedCapacity >=
queueCapacities.getAbsoluteMaximumCapacity(label)) {
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() + " used=" + queueUsage.getUsed()
+ " current-capacity (" + queueUsage.getUsed(label) + ") "
+ " >= max-capacity ("
+ labelManager.getResourceByLabel(label, clusterResource) + ")");
}
canAssign = false;
break;
}
}
return canAssign;
}
private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) {
if (this.reservationsContinueLooking) {
// check to see if we could potentially use this node instead of a reserved
// node
Resource reservedResources = Resources.createResource(getMetrics()
.getReservedMB(), getMetrics().getReservedVirtualCores());
float capacityWithoutReservedCapacity = Resources.divide(
resourceCalculator, clusterResource,
Resources.subtract(queueUsage.getUsed(), reservedResources),
clusterResource);
if (capacityWithoutReservedCapacity <= queueCapacities
.getAbsoluteMaximumCapacity()) {
if (LOG.isDebugEnabled()) {
LOG.debug("parent: try to use reserved: " + getQueueName()
+ " usedResources: " + queueUsage.getUsed().getMemory()
+ " clusterResources: " + clusterResource.getMemory()
+ " reservedResources: " + reservedResources.getMemory()
+ " currentCapacity " + ((float) queueUsage.getUsed().getMemory())
/ clusterResource.getMemory()
+ " potentialNewWithoutReservedCapacity: "
+ capacityWithoutReservedCapacity + " ( " + " max-capacity: "
+ queueCapacities.getAbsoluteMaximumCapacity() + ")");
}
// we could potentially use this node instead of reserved node
return true;
}
}
return false;
}
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
return (node.getReservedContainer() == null) &&
Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
@ -534,28 +458,38 @@ public class ParentQueue extends AbstractCSQueue {
}
private ResourceLimits getResourceLimitsOfChild(CSQueue child,
Resource clusterResource, ResourceLimits myLimits) {
/*
* Set head-room of a given child, limit =
* min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used
* + child.used. To avoid any of this queue's and its ancestors' limit
* being violated
*/
Resource myCurrentLimit =
getCurrentResourceLimit(clusterResource, myLimits);
// My available resource = my-current-limit - my-used-resource
Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit,
getUsedResources());
// Child's limit = my-available-resource + resource-already-used-by-child
Resource clusterResource, ResourceLimits parentLimits) {
// Set resource-limit of a given child, child.limit =
// min(my.limit - my.used + child.used, child.max)
// Parent available resource = parent-limit - parent-used-resource
Resource parentMaxAvailableResource =
Resources.subtract(parentLimits.getLimit(), getUsedResources());
// Child's limit = parent-available-resource + child-used
Resource childLimit =
Resources.add(myMaxAvailableResource, child.getUsedResources());
Resources.add(parentMaxAvailableResource, child.getUsedResources());
// Get child's max resource
Resource childConfiguredMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
child.getAbsoluteMaximumCapacity(), minimumAllocation);
// Child's limit should be capped by child configured max resource
childLimit =
Resources.min(resourceCalculator, clusterResource, childLimit,
childConfiguredMaxResource);
// Normalize before return
childLimit =
Resources.roundDown(resourceCalculator, childLimit, minimumAllocation);
return new ResourceLimits(childLimit);
}
private synchronized CSAssignment assignContainersToChildQueues(
Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve,
ResourceLimits limits) {
Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
@ -573,9 +507,7 @@ public class ParentQueue extends AbstractCSQueue {
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, limits);
assignment =
childQueue.assignContainers(cluster, node, needToUnreserve,
childLimits);
assignment = childQueue.assignContainers(cluster, node, childLimits);
if(LOG.isDebugEnabled()) {
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +

View File

@ -276,7 +276,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
synchronized public NodeId getNodeIdToUnreserve(Priority priority,
Resource capability) {
Resource resourceNeedUnreserve, ResourceCalculator rc,
Resource clusterResource) {
// first go around make this algorithm simple and just grab first
// reservation that has enough resources
@ -285,16 +286,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
NodeId nodeId = entry.getKey();
Resource containerResource = entry.getValue().getContainer().getResource();
// make sure we unreserve one with at least the same amount of
// resources, otherwise could affect capacity limits
if (Resources.fitsIn(capability, entry.getValue().getContainer()
.getResource())) {
if (Resources.lessThanOrEqual(rc, clusterResource,
resourceNeedUnreserve, containerResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving node with reservation size: "
+ entry.getValue().getContainer().getResource()
+ " in order to allocate container with size: " + capability);
+ containerResource
+ " in order to allocate container with size: " + resourceNeedUnreserve);
}
return entry.getKey();
return nodeId;
}
}
}

View File

@ -611,7 +611,7 @@ public class TestApplicationLimits {
app_0_0.updateResourceRequests(app_0_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@ -631,7 +631,7 @@ public class TestApplicationLimits {
app_0_1.updateResourceRequests(app_0_1_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource)); // Schedule to compute
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
@ -651,7 +651,7 @@ public class TestApplicationLimits {
app_1_0.updateResourceRequests(app_1_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource)); // Schedule to compute
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@ -660,7 +660,7 @@ public class TestApplicationLimits {
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource)); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());

View File

@ -128,6 +128,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ComparisonFailure;
import org.junit.Test;
import org.mockito.Mockito;
@ -2498,6 +2499,64 @@ public class TestCapacityScheduler {
Assert.assertEquals(30 * GB,
am1.doHeartbeat().getAvailableResources().getMemory());
}
@Test
public void testParentQueueMaxCapsAreRespected() throws Exception {
/*
* Queue tree:
* Root
* / \
* A B
* / \
* A1 A2
*/
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
csConf.setCapacity(A, 50);
csConf.setMaximumCapacity(A, 50);
csConf.setCapacity(B, 50);
// Define 2nd-level queues
csConf.setQueues(A, new String[] {"a1", "a2"});
csConf.setCapacity(A1, 50);
csConf.setUserLimitFactor(A1, 100.0f);
csConf.setCapacity(A2, 50);
csConf.setUserLimitFactor(A2, 100.0f);
csConf.setCapacity(B1, B1_CAPACITY);
csConf.setUserLimitFactor(B1, 100.0f);
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService());
nm1.registerNode();
// Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1);
// Try to launch app2 in a2, asked 2GB, should success
RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "a2");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
try {
// Try to allocate a container, a's usage=11G/max=12
// a1's usage=9G/max=12
// a2's usage=2G/max=12
// In this case, if a2 asked 2G, should fail.
waitContainerAllocated(am2, 2 * GB, 1, 2, rm1, nm1);
} catch (AssertionError failure) {
// Expected, return;
return;
}
Assert.fail("Shouldn't successfully allocate containers for am2, "
+ "queue-a's max capacity will be violated if container allocated");
}
// Verifies headroom passed to ApplicationMaster has been updated in
// RMAppAttemptMetrics

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@ -145,7 +144,7 @@ public class TestChildQueueOrder {
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).
when(queue)
.assignContainers(eq(clusterResource), eq(node), anyBoolean(),
.assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
// Mock the node's resource availability
@ -157,7 +156,7 @@ public class TestChildQueueOrder {
return new CSAssignment(allocatedResource, type);
}
}).
when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(),
when(queue).assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
doNothing().when(node).releaseContainer(any(Container.class));
}
@ -274,7 +273,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
for(int i=0; i < 2; i++)
{
@ -282,7 +281,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
for(int i=0; i < 3; i++)
@ -291,7 +290,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
for(int i=0; i < 4; i++)
@ -300,7 +299,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
verifyQueueMetrics(a, 1*GB, clusterResource);
@ -334,7 +333,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
verifyQueueMetrics(a, 3*GB, clusterResource);
@ -362,7 +361,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 3*GB, clusterResource);
@ -389,7 +388,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -404,13 +403,13 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
InOrder allocationOrder = inOrder(d,b);
allocationOrder.verify(d).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
any(FiCaSchedulerNode.class), any(ResourceLimits.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
any(FiCaSchedulerNode.class), any(ResourceLimits.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);

View File

@ -350,8 +350,8 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0, false,
new ResourceLimits(clusterResource));
a.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
assertEquals(
(int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
a.getMetrics().getAvailableMB());
@ -486,7 +486,7 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -497,7 +497,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -506,7 +506,7 @@ public class TestLeafQueue {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Can't allocate 3rd due to user-limit
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -516,7 +516,7 @@ public class TestLeafQueue {
// Bump up user-limit-factor, now allocate should work
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@ -525,7 +525,7 @@ public class TestLeafQueue {
assertEquals(3*GB, a.getMetrics().getAllocatedMB());
// One more should work, for app_1, due to user-limit-factor
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@ -536,8 +536,8 @@ public class TestLeafQueue {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0, false,
new ResourceLimits(clusterResource));
a.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@ -652,21 +652,21 @@ public class TestLeafQueue {
// recordFactory)));
// 1 container to user_0
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
// One more to user_0 since he is the only active user
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -718,7 +718,7 @@ public class TestLeafQueue {
assertEquals("There should only be 1 active user!",
1, qb.getActiveUsersManager().getNumActiveUsers());
//get headroom
qb.assignContainers(clusterResource, node_0, false,
qb.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
@ -738,7 +738,7 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
u1Priority, recordFactory)));
qb.submitApplicationAttempt(app_2, user_1);
qb.assignContainers(clusterResource, node_1, false,
qb.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
@ -781,9 +781,9 @@ public class TestLeafQueue {
u1Priority, recordFactory)));
qb.submitApplicationAttempt(app_1, user_0);
qb.submitApplicationAttempt(app_3, user_1);
qb.assignContainers(clusterResource, node_0, false,
qb.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
qb.assignContainers(clusterResource, node_0, false,
qb.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
@ -802,7 +802,7 @@ public class TestLeafQueue {
app_4.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
u0Priority, recordFactory)));
qb.assignContainers(clusterResource, node_1, false,
qb.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
@ -875,7 +875,7 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory)));
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -892,7 +892,7 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -981,7 +981,7 @@ public class TestLeafQueue {
1, a.getActiveUsersManager().getNumActiveUsers());
// 1 container to user_0
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -992,7 +992,7 @@ public class TestLeafQueue {
// the application is not yet active
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1009,7 +1009,7 @@ public class TestLeafQueue {
// No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1023,7 +1023,7 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
}
@ -1094,7 +1094,7 @@ public class TestLeafQueue {
*/
// Only 1 container
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1102,7 +1102,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1110,7 +1110,7 @@ public class TestLeafQueue {
// Can't allocate 3rd due to user-limit
a.setUserLimit(25);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1129,7 +1129,7 @@ public class TestLeafQueue {
// Now allocations should goto app_2 since
// user_0 is at limit inspite of high user-limit-factor
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1139,7 +1139,7 @@ public class TestLeafQueue {
// Now allocations should goto app_0 since
// user_0 is at user-limit not above it
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@ -1150,7 +1150,7 @@ public class TestLeafQueue {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@ -1162,7 +1162,7 @@ public class TestLeafQueue {
// Now, allocations should goto app_3 since it's under user-limit
a.setMaxCapacity(1.0f);
a.setUserLimitFactor(1);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(7*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@ -1171,7 +1171,7 @@ public class TestLeafQueue {
assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
// Now we should assign to app_3 again since user_2 is under user-limit
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
@ -1271,7 +1271,7 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1282,7 +1282,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1291,7 +1291,7 @@ public class TestLeafQueue {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1308,7 +1308,7 @@ public class TestLeafQueue {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1325,7 +1325,7 @@ public class TestLeafQueue {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -1393,7 +1393,7 @@ public class TestLeafQueue {
// Start testing...
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1403,7 +1403,7 @@ public class TestLeafQueue {
assertEquals(0*GB, a.getMetrics().getAvailableMB());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1417,7 +1417,7 @@ public class TestLeafQueue {
// We do not need locality delay here
doReturn(-1).when(a).getNodeLocalityDelay();
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(10*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1434,7 +1434,7 @@ public class TestLeafQueue {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -1503,7 +1503,7 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1511,14 +1511,14 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@ -1533,7 +1533,7 @@ public class TestLeafQueue {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1543,7 +1543,7 @@ public class TestLeafQueue {
assertEquals(1, app_1.getReReservations(priority));
// Re-reserve
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1553,7 +1553,7 @@ public class TestLeafQueue {
assertEquals(2, app_1.getReReservations(priority));
// Try to schedule on node_1 now, should *move* the reservation
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(9*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -1571,7 +1571,7 @@ public class TestLeafQueue {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
CSAssignment assignment = a.assignContainers(clusterResource, node_0, false,
CSAssignment assignment = a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -1643,7 +1643,7 @@ public class TestLeafQueue {
CSAssignment assignment = null;
// Start with off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2, false,
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1652,7 +1652,7 @@ public class TestLeafQueue {
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2, false,
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1661,7 +1661,7 @@ public class TestLeafQueue {
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2, false,
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1671,7 +1671,7 @@ public class TestLeafQueue {
// Another off switch, now we should allocate
// since missedOpportunities=3 and reqdContainers=3
assignment = a.assignContainers(clusterResource, node_2, false,
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1680,7 +1680,7 @@ public class TestLeafQueue {
assertEquals(NodeType.OFF_SWITCH, assignment.getType());
// NODE_LOCAL - node_0
assignment = a.assignContainers(clusterResource, node_0, false,
assignment = a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1689,7 +1689,7 @@ public class TestLeafQueue {
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
// NODE_LOCAL - node_1
assignment = a.assignContainers(clusterResource, node_1, false,
assignment = a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1718,14 +1718,14 @@ public class TestLeafQueue {
doReturn(1).when(a).getNodeLocalityDelay();
// Shouldn't assign RACK_LOCAL yet
assignment = a.assignContainers(clusterResource, node_3, false,
assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(2, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Should assign RACK_LOCAL now
assignment = a.assignContainers(clusterResource, node_3, false,
assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1807,7 +1807,7 @@ public class TestLeafQueue {
// Start with off switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@ -1820,7 +1820,7 @@ public class TestLeafQueue {
// Another off-switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@ -1832,7 +1832,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Another off-switch, shouldn't allocate OFF_SWITCH P1
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@ -1844,7 +1844,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, DATA_LOCAL for P1
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@ -1856,7 +1856,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, OFF_SWITCH for P2
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
@ -1933,7 +1933,7 @@ public class TestLeafQueue {
app_0.updateResourceRequests(app_0_requests_0);
// NODE_LOCAL - node_0_1
a.assignContainers(clusterResource, node_0_0, false,
a.assignContainers(clusterResource, node_0_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1942,7 +1942,7 @@ public class TestLeafQueue {
// No allocation on node_1_0 even though it's node/rack local since
// required(ANY) == 0
a.assignContainers(clusterResource, node_1_0, false,
a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1959,7 +1959,7 @@ public class TestLeafQueue {
// No allocation on node_0_1 even though it's node/rack local since
// required(rack_1) == 0
a.assignContainers(clusterResource, node_0_1, false,
a.assignContainers(clusterResource, node_0_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -1967,7 +1967,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority));
// NODE_LOCAL - node_1
a.assignContainers(clusterResource, node_1_0, false,
a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2220,7 +2220,7 @@ public class TestLeafQueue {
// node_0_1
// Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
a.assignContainers(clusterResource, node_0_1, false,
a.assignContainers(clusterResource, node_0_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2243,7 +2243,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since RR(rack_1) = relax: false
a.assignContainers(clusterResource, node_1_1, false,
a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2274,7 +2274,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since node_1_1 is blacklisted
a.assignContainers(clusterResource, node_1_1, false,
a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2303,7 +2303,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since rack_1 is blacklisted
a.assignContainers(clusterResource, node_1_1, false,
a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2330,7 +2330,7 @@ public class TestLeafQueue {
// Blacklist: < host_0_0 > <----
// Now, should allocate since RR(rack_1) = relax: true
a.assignContainers(clusterResource, node_1_1, false,
a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource));
verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2361,7 +2361,7 @@ public class TestLeafQueue {
// host_1_0: 8G
// host_1_1: 7G
a.assignContainers(clusterResource, node_1_0, false,
a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource));
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
@ -2444,7 +2444,7 @@ public class TestLeafQueue {
recordFactory)));
try {
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
} catch (NullPointerException e) {
Assert.fail("NPE when allocating container on node but "

View File

@ -156,7 +156,7 @@ public class TestParentQueue {
// Next call - nothing
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).when(queue)
.assignContainers(eq(clusterResource), eq(node), eq(false),
.assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
// Mock the node's resource availability
@ -167,8 +167,7 @@ public class TestParentQueue {
return new CSAssignment(allocatedResource, type);
}
}).
when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
}).when(queue).assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
}
@ -232,7 +231,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
@ -240,13 +239,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
stubQueueAllocation(a, clusterResource, node_1, 2*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1, false,
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -254,13 +253,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// since A has 2/6G while B has 2/14G
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
stubQueueAllocation(b, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -268,13 +267,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// since A has 3/6G while B has 4/14G
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 4*GB);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
@ -282,13 +281,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// since A has 3/6G while B has 8/14G
stubQueueAllocation(a, clusterResource, node_1, 1*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1, false,
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(a, b);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 4*GB, clusterResource);
verifyQueueMetrics(b, 9*GB, clusterResource);
}
@ -405,6 +404,22 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
@Test
public void testMultiLevelQueues() throws Exception {
/*
* Structure of queue:
* Root
* ____________
* / | \ \
* A B C D
* / | / | \ \
* A1 A2 B1 B2 B3 C1
* \
* C11
* \
* C111
* \
* C1111
*/
// Setup queue configs
setupMultiLevelQueues(csConf);
@ -449,7 +464,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 0*GB, clusterResource);
@ -462,7 +477,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
stubQueueAllocation(a, clusterResource, node_1, 0*GB);
stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
stubQueueAllocation(c, clusterResource, node_1, 0*GB);
root.assignContainers(clusterResource, node_1, false,
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -474,15 +489,15 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
stubQueueAllocation(a1, clusterResource, node_0, 1*GB);
stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
stubQueueAllocation(c, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
InOrder allocationOrder = inOrder(a, c, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 6*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -501,17 +516,17 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
stubQueueAllocation(b3, clusterResource, node_2, 1*GB);
stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
stubQueueAllocation(c, clusterResource, node_2, 1*GB);
root.assignContainers(clusterResource, node_2, false,
root.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(a, a2, a1, b, c);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a2).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
verifyQueueMetrics(c, 4*GB, clusterResource);
@ -611,7 +626,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
@ -620,13 +635,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// also, B gets a scheduling opportunity since A allocates RACK_LOCAL
stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1, false,
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -635,13 +650,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// However, since B returns off-switch, A won't get an opportunity
stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -680,7 +695,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// Simulate B3 returning a container on node_0
stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
verifyQueueMetrics(b2, 0*GB, clusterResource);
verifyQueueMetrics(b3, 1*GB, clusterResource);
@ -689,13 +704,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL
stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1, false,
root.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
InOrder allocationOrder = inOrder(b2, b3);
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 2*GB, clusterResource);
@ -704,13 +719,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
// However, since B3 returns off-switch, B2 won't get an opportunity
stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0, false,
root.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
allocationOrder = inOrder(b3, b2);
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
any(FiCaSchedulerNode.class), anyResourceLimits());
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 3*GB, clusterResource);

View File

@ -265,7 +265,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@ -277,7 +277,7 @@ public class TestReservations {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@ -289,7 +289,7 @@ public class TestReservations {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -304,7 +304,7 @@ public class TestReservations {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// try to assign reducer (5G on node 0 and should reserve)
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -320,7 +320,7 @@ public class TestReservations {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// assign reducer to node 2
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@ -337,7 +337,7 @@ public class TestReservations {
// node_1 heartbeat and unreserves from node_0 in order to allocate
// on node_1
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory());
@ -421,7 +421,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@ -433,7 +433,7 @@ public class TestReservations {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@ -445,7 +445,7 @@ public class TestReservations {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -460,7 +460,7 @@ public class TestReservations {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// try to assign reducer (5G on node 0 and should reserve)
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -476,7 +476,7 @@ public class TestReservations {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// assign reducer to node 2
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@ -493,7 +493,7 @@ public class TestReservations {
// node_1 heartbeat and won't unreserve from node_0, potentially stuck
// if AM doesn't handle
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@ -569,7 +569,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@ -580,7 +580,7 @@ public class TestReservations {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@ -591,7 +591,7 @@ public class TestReservations {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -605,7 +605,7 @@ public class TestReservations {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// try to assign reducer (5G on node 0 and should reserve)
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -620,7 +620,7 @@ public class TestReservations {
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
// could allocate but told need to unreserve first
a.assignContainers(clusterResource, node_1, true,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@ -652,6 +652,8 @@ public class TestReservations {
String host_1 = "host_1";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
8 * GB);
Resource clusterResource = Resources.createResource(2 * 8 * GB);
// Setup resource-requests
Priority priorityMap = TestUtils.createMockPriority(5);
@ -681,23 +683,28 @@ public class TestReservations {
node_0.getNodeID(), "user", rmContext);
// no reserved containers
NodeId unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
NodeId unreserveId =
app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator(), clusterResource);
assertEquals(null, unreserveId);
// no reserved containers - reserve then unreserve
app_0.reserve(node_0, priorityMap, rmContainer_1, container_1);
app_0.unreserve(node_0, priorityMap);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator(), clusterResource);
assertEquals(null, unreserveId);
// no container large enough is reserved
app_0.reserve(node_0, priorityMap, rmContainer_1, container_1);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator(), clusterResource);
assertEquals(null, unreserveId);
// reserve one that is now large enough
app_0.reserve(node_1, priorityMap, rmContainer, container);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator(), clusterResource);
assertEquals(node_1.getNodeID(), unreserveId);
}
@ -741,14 +748,14 @@ public class TestReservations {
// nothing reserved
boolean res = a.findNodeToUnreserve(csContext.getClusterResource(),
node_1, app_0, priorityMap, capability);
node_1, app_0, priorityMap, capability, capability);
assertFalse(res);
// reserved but scheduler doesn't know about that node.
app_0.reserve(node_1, priorityMap, rmContainer, container);
node_1.reserveResource(app_0, priorityMap, rmContainer);
res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0,
priorityMap, capability);
priorityMap, capability, capability);
assertFalse(res);
}
@ -815,7 +822,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@ -826,7 +833,7 @@ public class TestReservations {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@ -837,7 +844,7 @@ public class TestReservations {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -852,14 +859,15 @@ public class TestReservations {
// absoluteMaxCapacity
Resource capability = Resources.createResource(32 * GB, 0);
boolean res =
a.canAssignToThisQueue(clusterResource, capability,
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
a.canAssignToThisQueue(clusterResource,
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
clusterResource), capability, Resources.none());
assertFalse(res);
// now add in reservations and make sure it continues if config set
// allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -872,14 +880,17 @@ public class TestReservations {
capability = Resources.createResource(5 * GB, 0);
res =
a.canAssignToThisQueue(clusterResource, capability,
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
a.canAssignToThisQueue(clusterResource,
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
clusterResource), capability, Resources
.createResource(5 * GB));
assertTrue(res);
// tell to not check reservations
res =
a.canAssignToThisQueue(clusterResource, capability,
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, false);
a.canAssignToThisQueue(clusterResource,
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
clusterResource), capability, Resources.none());
assertFalse(res);
refreshQueuesTurnOffReservationsContLook(a, csConf);
@ -887,13 +898,16 @@ public class TestReservations {
// should return false no matter what checkReservations is passed
// in since feature is off
res =
a.canAssignToThisQueue(clusterResource, capability,
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, false);
a.canAssignToThisQueue(clusterResource,
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
clusterResource), capability, Resources.none());
assertFalse(res);
res =
a.canAssignToThisQueue(clusterResource, capability,
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
a.canAssignToThisQueue(clusterResource,
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
clusterResource), capability, Resources
.createResource(5 * GB));
assertFalse(res);
}
@ -984,16 +998,16 @@ public class TestReservations {
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
priorityAM, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true,
priorityReduce, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true,
priorityMap, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true,
priorityReduce, recordFactory)));
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@ -1004,7 +1018,7 @@ public class TestReservations {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@ -1015,7 +1029,7 @@ public class TestReservations {
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -1029,7 +1043,7 @@ public class TestReservations {
// now add in reservations and make sure it continues if config set
// allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -1116,19 +1130,19 @@ public class TestReservations {
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
priorityAM, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true,
priorityReduce, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true,
priorityMap, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true,
priorityReduce, recordFactory)));
app_0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true,
priorityLast, recordFactory)));
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
@ -1140,7 +1154,7 @@ public class TestReservations {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
@ -1152,7 +1166,7 @@ public class TestReservations {
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1, false,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
@ -1164,38 +1178,41 @@ public class TestReservations {
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// try to assign reducer (5G on node 0), but tell it
// it has to unreserve. No room to allocate and shouldn't reserve
// since nothing currently reserved.
a.assignContainers(clusterResource, node_0, true,
new ResourceLimits(clusterResource));
// try to assign reducer (5G on node 0), but tell it's resource limits <
// used (8G) + required (5G). It will not reserved since it has to unreserve
// some resource. Even with continous reservation looking, we don't allow
// unreserve resource to reserve container.
a.assignContainers(clusterResource, node_0,
new ResourceLimits(Resources.createResource(10 * GB)));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
assertEquals(8 * GB, a.getMetrics().getAllocatedMB());
assertEquals(16 * GB, a.getMetrics().getAvailableMB());
assertEquals(16 * GB, app_0.getHeadroom().getMemory());
// app_0's headroom = limit (10G) - used (8G) = 2G
assertEquals(2 * GB, app_0.getHeadroom().getMemory());
assertEquals(5 * GB, node_0.getUsedResource().getMemory());
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// try to assign reducer (5G on node 2), but tell it
// it has to unreserve. Has room but shouldn't reserve
// since nothing currently reserved.
a.assignContainers(clusterResource, node_2, true,
new ResourceLimits(clusterResource));
// try to assign reducer (5G on node 0), but tell it's resource limits <
// used (8G) + required (5G). It will not reserved since it has to unreserve
// some resource. Unfortunately, there's nothing to unreserve.
a.assignContainers(clusterResource, node_2,
new ResourceLimits(Resources.createResource(10 * GB)));
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
assertEquals(8 * GB, a.getMetrics().getAllocatedMB());
assertEquals(16 * GB, a.getMetrics().getAvailableMB());
assertEquals(16 * GB, app_0.getHeadroom().getMemory());
// app_0's headroom = limit (10G) - used (8G) = 2G
assertEquals(2 * GB, app_0.getHeadroom().getMemory());
assertEquals(5 * GB, node_0.getUsedResource().getMemory());
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
// let it assign 5G to node_2
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@ -1208,7 +1225,7 @@ public class TestReservations {
assertEquals(5 * GB, node_2.getUsedResource().getMemory());
// reserve 8G node_0
a.assignContainers(clusterResource, node_0, false,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource));
assertEquals(21 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
@ -1223,7 +1240,7 @@ public class TestReservations {
// try to assign (8G on node 2). No room to allocate,
// continued to try due to having reservation above,
// but hits queue limits so can't reserve anymore.
a.assignContainers(clusterResource, node_2, false,
a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource));
assertEquals(21 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());