YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan.
This commit is contained in:
parent
a89b087c45
commit
487374b7fe
|
@ -56,6 +56,9 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-3243. CapacityScheduler should pass headroom from parent to children
|
||||
to make sure ParentQueue obey its capacity limits. (Wangda Tan via jianhe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
|
|
@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
|
@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.security.AccessType;
|
||||
import org.apache.hadoop.yarn.security.PrivilegedEntity;
|
||||
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
|
||||
|
@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|||
import com.google.common.collect.Sets;
|
||||
|
||||
public abstract class AbstractCSQueue implements CSQueue {
|
||||
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
|
||||
|
||||
CSQueue parent;
|
||||
final String queueName;
|
||||
|
@ -406,21 +411,102 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
parentQ.getPreemptionDisabled());
|
||||
}
|
||||
|
||||
protected Resource getCurrentResourceLimit(Resource clusterResource,
|
||||
ResourceLimits currentResourceLimits) {
|
||||
private Resource getCurrentLimitResource(String nodeLabel,
|
||||
Resource clusterResource, ResourceLimits currentResourceLimits) {
|
||||
/*
|
||||
* Queue's max available resource = min(my.max, my.limit)
|
||||
* my.limit is set by my parent, considered used resource of my siblings
|
||||
* Current limit resource: For labeled resource: limit = queue-max-resource
|
||||
* (TODO, this part need update when we support labeled-limit) For
|
||||
* non-labeled resource: limit = min(queue-max-resource,
|
||||
* limit-set-by-parent)
|
||||
*/
|
||||
Resource queueMaxResource =
|
||||
Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource,
|
||||
queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation);
|
||||
Resource queueCurrentResourceLimit =
|
||||
Resources.min(resourceCalculator, clusterResource, queueMaxResource,
|
||||
currentResourceLimits.getLimit());
|
||||
queueCurrentResourceLimit =
|
||||
Resources.roundDown(resourceCalculator, queueCurrentResourceLimit,
|
||||
minimumAllocation);
|
||||
return queueCurrentResourceLimit;
|
||||
Resources.multiplyAndNormalizeDown(resourceCalculator,
|
||||
labelManager.getResourceByLabel(nodeLabel, clusterResource),
|
||||
queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation);
|
||||
if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||
return Resources.min(resourceCalculator, clusterResource,
|
||||
queueMaxResource, currentResourceLimits.getLimit());
|
||||
}
|
||||
return queueMaxResource;
|
||||
}
|
||||
|
||||
synchronized boolean canAssignToThisQueue(Resource clusterResource,
|
||||
Set<String> nodeLabels, ResourceLimits currentResourceLimits,
|
||||
Resource nowRequired, Resource resourceCouldBeUnreserved) {
|
||||
// Get label of this queue can access, it's (nodeLabel AND queueLabel)
|
||||
Set<String> labelCanAccess;
|
||||
if (null == nodeLabels || nodeLabels.isEmpty()) {
|
||||
labelCanAccess = new HashSet<String>();
|
||||
// Any queue can always access any node without label
|
||||
labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
|
||||
} else {
|
||||
labelCanAccess = new HashSet<String>(
|
||||
accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
|
||||
: Sets.intersection(accessibleLabels, nodeLabels));
|
||||
}
|
||||
|
||||
for (String label : labelCanAccess) {
|
||||
// New total resource = used + required
|
||||
Resource newTotalResource =
|
||||
Resources.add(queueUsage.getUsed(label), nowRequired);
|
||||
|
||||
Resource currentLimitResource =
|
||||
getCurrentLimitResource(label, clusterResource, currentResourceLimits);
|
||||
|
||||
// if reservation continous looking enabled, check to see if could we
|
||||
// potentially use this node instead of a reserved node if the application
|
||||
// has reserved containers.
|
||||
// TODO, now only consider reservation cases when the node has no label
|
||||
if (this.reservationsContinueLooking
|
||||
&& label.equals(RMNodeLabelsManager.NO_LABEL)
|
||||
&& Resources.greaterThan(resourceCalculator, clusterResource,
|
||||
resourceCouldBeUnreserved, Resources.none())) {
|
||||
// resource-without-reserved = used - reserved
|
||||
Resource newTotalWithoutReservedResource =
|
||||
Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
|
||||
|
||||
// when total-used-without-reserved-resource < currentLimit, we still
|
||||
// have chance to allocate on this node by unreserving some containers
|
||||
if (Resources.lessThan(resourceCalculator, clusterResource,
|
||||
newTotalWithoutReservedResource, currentLimitResource)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("try to use reserved: " + getQueueName()
|
||||
+ " usedResources: " + queueUsage.getUsed()
|
||||
+ ", clusterResources: " + clusterResource
|
||||
+ ", reservedResources: " + resourceCouldBeUnreserved
|
||||
+ ", capacity-without-reserved: "
|
||||
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
|
||||
+ currentLimitResource);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise, if any of the label of this node beyond queue limit, we
|
||||
// cannot allocate on this node. Consider a small epsilon here.
|
||||
if (Resources.greaterThan(resourceCalculator, clusterResource,
|
||||
newTotalResource, currentLimitResource)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getQueueName()
|
||||
+ "Check assign to queue, label=" + label
|
||||
+ " usedResources: " + queueUsage.getUsed(label)
|
||||
+ " clusterResources: " + clusterResource
|
||||
+ " currentUsedCapacity "
|
||||
+ Resources.divide(resourceCalculator, clusterResource,
|
||||
queueUsage.getUsed(label),
|
||||
labelManager.getResourceByLabel(label, clusterResource))
|
||||
+ " max-capacity: "
|
||||
+ queueCapacities.getAbsoluteMaximumCapacity(label)
|
||||
+ ")");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Actually, this will not happen, since labelCanAccess will be always
|
||||
// non-empty
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -189,13 +189,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
|
|||
* Assign containers to applications in the queue or it's children (if any).
|
||||
* @param clusterResource the resource of the cluster.
|
||||
* @param node node on which resources are available
|
||||
* @param needToUnreserve assign container only if it can unreserve one first
|
||||
* @param resourceLimits how much overall resource of this queue can use.
|
||||
* @return the assignment
|
||||
*/
|
||||
public CSAssignment assignContainers(Resource clusterResource,
|
||||
FiCaSchedulerNode node, boolean needToUnreserve,
|
||||
ResourceLimits resourceLimits);
|
||||
FiCaSchedulerNode node, ResourceLimits resourceLimits);
|
||||
|
||||
/**
|
||||
* A container assigned to the queue has completed.
|
||||
|
|
|
@ -1061,9 +1061,14 @@ public class CapacityScheduler extends
|
|||
node.getNodeID());
|
||||
|
||||
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
|
||||
CSAssignment assignment = queue.assignContainers(clusterResource, node,
|
||||
false, new ResourceLimits(
|
||||
clusterResource));
|
||||
CSAssignment assignment =
|
||||
queue.assignContainers(
|
||||
clusterResource,
|
||||
node,
|
||||
// TODO, now we only consider limits for parent for non-labeled
|
||||
// resources, should consider labeled resources as well.
|
||||
new ResourceLimits(labelManager.getResourceByLabel(
|
||||
RMNodeLabelsManager.NO_LABEL, clusterResource)));
|
||||
|
||||
RMContainer excessReservation = assignment.getExcessReservation();
|
||||
if (excessReservation != null) {
|
||||
|
@ -1087,8 +1092,13 @@ public class CapacityScheduler extends
|
|||
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
|
||||
", available: " + node.getAvailableResource());
|
||||
}
|
||||
root.assignContainers(clusterResource, node, false, new ResourceLimits(
|
||||
clusterResource));
|
||||
root.assignContainers(
|
||||
clusterResource,
|
||||
node,
|
||||
// TODO, now we only consider limits for parent for non-labeled
|
||||
// resources, should consider labeled resources as well.
|
||||
new ResourceLimits(labelManager.getResourceByLabel(
|
||||
RMNodeLabelsManager.NO_LABEL, clusterResource)));
|
||||
}
|
||||
} else {
|
||||
LOG.info("Skipping scheduling since node " + node.getNodeID() +
|
||||
|
@ -1209,6 +1219,13 @@ public class CapacityScheduler extends
|
|||
usePortForNodeName, nodeManager.getNodeLabels());
|
||||
this.nodes.put(nodeManager.getNodeID(), schedulerNode);
|
||||
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
|
||||
|
||||
// update this node to node label manager
|
||||
if (labelManager != null) {
|
||||
labelManager.activateNode(nodeManager.getNodeID(),
|
||||
nodeManager.getTotalCapability());
|
||||
}
|
||||
|
||||
root.updateClusterResource(clusterResource, new ResourceLimits(
|
||||
clusterResource));
|
||||
int numNodes = numNodeManagers.incrementAndGet();
|
||||
|
@ -1220,12 +1237,6 @@ public class CapacityScheduler extends
|
|||
if (scheduleAsynchronously && numNodes == 1) {
|
||||
asyncSchedulerThread.beginSchedule();
|
||||
}
|
||||
|
||||
// update this node to node label manager
|
||||
if (labelManager != null) {
|
||||
labelManager.activateNode(nodeManager.getNodeID(),
|
||||
nodeManager.getTotalCapability());
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void removeNode(RMNode nodeInfo) {
|
||||
|
|
|
@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
|
|||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
|
@ -157,7 +156,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
// and all queues may not be realized yet, we'll use (optimistic)
|
||||
// absoluteMaxCapacity (it will be replaced with the more accurate
|
||||
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
|
||||
computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
|
||||
setQueueResourceLimitsInfo(clusterResource);
|
||||
|
||||
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
||||
userLimit = conf.getUserLimit(getQueuePath());
|
||||
|
@ -739,9 +738,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
@Override
|
||||
public synchronized CSAssignment assignContainers(Resource clusterResource,
|
||||
FiCaSchedulerNode node, boolean needToUnreserve,
|
||||
ResourceLimits currentResourceLimits) {
|
||||
this.currentResourceLimits = currentResourceLimits;
|
||||
FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
|
||||
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("assignContainers: node=" + node.getNodeName()
|
||||
|
@ -796,7 +794,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
continue;
|
||||
}
|
||||
if (!this.reservationsContinueLooking) {
|
||||
if (!needContainers(application, priority, required)) {
|
||||
if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("doesn't need containers based on reservation algo!");
|
||||
}
|
||||
|
@ -818,8 +816,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
required, requestedNodeLabels);
|
||||
|
||||
// Check queue max-capacity limit
|
||||
if (!canAssignToThisQueue(clusterResource, required,
|
||||
node.getLabels(), application, true)) {
|
||||
if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
|
||||
this.currentResourceLimits, required, application.getCurrentReservation())) {
|
||||
return NULL_ASSIGNMENT;
|
||||
}
|
||||
|
||||
|
@ -835,7 +833,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
// Try to schedule
|
||||
CSAssignment assignment =
|
||||
assignContainersOnNode(clusterResource, node, application, priority,
|
||||
null, needToUnreserve);
|
||||
null);
|
||||
|
||||
// Did the application skip this node?
|
||||
if (assignment.getSkipped()) {
|
||||
|
@ -896,7 +894,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
// Try to assign if we have sufficient resources
|
||||
assignContainersOnNode(clusterResource, node, application, priority,
|
||||
rmContainer, false);
|
||||
rmContainer);
|
||||
|
||||
// Doesn't matter... since it's already charged for at time of reservation
|
||||
// "re-reservation" is *free*
|
||||
|
@ -938,102 +936,14 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
|
||||
return headroom;
|
||||
}
|
||||
|
||||
synchronized boolean canAssignToThisQueue(Resource clusterResource,
|
||||
Resource required, Set<String> nodeLabels, FiCaSchedulerApp application,
|
||||
boolean checkReservations) {
|
||||
// Get label of this queue can access, it's (nodeLabel AND queueLabel)
|
||||
Set<String> labelCanAccess;
|
||||
if (null == nodeLabels || nodeLabels.isEmpty()) {
|
||||
labelCanAccess = new HashSet<String>();
|
||||
// Any queue can always access any node without label
|
||||
labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
|
||||
} else {
|
||||
labelCanAccess = new HashSet<String>(Sets.intersection(accessibleLabels, nodeLabels));
|
||||
}
|
||||
|
||||
boolean canAssign = true;
|
||||
for (String label : labelCanAccess) {
|
||||
Resource potentialTotalCapacity =
|
||||
Resources.add(queueUsage.getUsed(label), required);
|
||||
|
||||
float potentialNewCapacity =
|
||||
Resources.divide(resourceCalculator, clusterResource,
|
||||
potentialTotalCapacity,
|
||||
labelManager.getResourceByLabel(label, clusterResource));
|
||||
// if enabled, check to see if could we potentially use this node instead
|
||||
// of a reserved node if the application has reserved containers
|
||||
// TODO, now only consider reservation cases when the node has no label
|
||||
if (this.reservationsContinueLooking && checkReservations
|
||||
&& label.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||
float potentialNewWithoutReservedCapacity = Resources.divide(
|
||||
resourceCalculator,
|
||||
clusterResource,
|
||||
Resources.subtract(potentialTotalCapacity,
|
||||
application.getCurrentReservation()),
|
||||
labelManager.getResourceByLabel(label, clusterResource));
|
||||
|
||||
if (potentialNewWithoutReservedCapacity <= queueCapacities
|
||||
.getAbsoluteMaximumCapacity()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("try to use reserved: "
|
||||
+ getQueueName()
|
||||
+ " usedResources: "
|
||||
+ queueUsage.getUsed()
|
||||
+ " clusterResources: "
|
||||
+ clusterResource
|
||||
+ " reservedResources: "
|
||||
+ application.getCurrentReservation()
|
||||
+ " currentCapacity "
|
||||
+ Resources.divide(resourceCalculator, clusterResource,
|
||||
queueUsage.getUsed(), clusterResource) + " required " + required
|
||||
+ " potentialNewWithoutReservedCapacity: "
|
||||
+ potentialNewWithoutReservedCapacity + " ( "
|
||||
+ " max-capacity: "
|
||||
+ queueCapacities.getAbsoluteMaximumCapacity() + ")");
|
||||
}
|
||||
// we could potentially use this node instead of reserved node
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise, if any of the label of this node beyond queue limit, we
|
||||
// cannot allocate on this node. Consider a small epsilon here.
|
||||
if (potentialNewCapacity > queueCapacities
|
||||
.getAbsoluteMaximumCapacity(label) + 1e-4) {
|
||||
canAssign = false;
|
||||
break;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getQueueName()
|
||||
+ "Check assign to queue, label=" + label
|
||||
+ " usedResources: " + queueUsage.getUsed(label)
|
||||
+ " clusterResources: " + clusterResource
|
||||
+ " currentCapacity "
|
||||
+ Resources.divide(resourceCalculator, clusterResource,
|
||||
queueUsage.getUsed(label),
|
||||
labelManager.getResourceByLabel(label, clusterResource))
|
||||
+ " potentialNewCapacity: " + potentialNewCapacity + " ( "
|
||||
+ " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity()
|
||||
+ ")");
|
||||
}
|
||||
}
|
||||
|
||||
return canAssign;
|
||||
}
|
||||
|
||||
private Resource computeQueueCurrentLimitAndSetHeadroomInfo(
|
||||
private void setQueueResourceLimitsInfo(
|
||||
Resource clusterResource) {
|
||||
Resource queueCurrentResourceLimit =
|
||||
getCurrentResourceLimit(clusterResource, currentResourceLimits);
|
||||
|
||||
synchronized (queueResourceLimitsInfo) {
|
||||
queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit);
|
||||
queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits
|
||||
.getLimit());
|
||||
queueResourceLimitsInfo.setClusterResource(clusterResource);
|
||||
}
|
||||
|
||||
return queueCurrentResourceLimit;
|
||||
}
|
||||
|
||||
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
|
||||
|
@ -1048,16 +958,16 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
computeUserLimit(application, clusterResource, required,
|
||||
queueUser, requestedLabels);
|
||||
|
||||
Resource currentResourceLimit =
|
||||
computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
|
||||
setQueueResourceLimitsInfo(clusterResource);
|
||||
|
||||
Resource headroom =
|
||||
getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit);
|
||||
getHeadroom(queueUser, currentResourceLimits.getLimit(),
|
||||
clusterResource, userLimit);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Headroom calculation for user " + user + ": " +
|
||||
" userLimit=" + userLimit +
|
||||
" queueMaxAvailRes=" + currentResourceLimit +
|
||||
" queueMaxAvailRes=" + currentResourceLimits.getLimit() +
|
||||
" consumed=" + queueUser.getUsed() +
|
||||
" headroom=" + headroom);
|
||||
}
|
||||
|
@ -1207,8 +1117,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
return true;
|
||||
}
|
||||
|
||||
boolean needContainers(FiCaSchedulerApp application, Priority priority,
|
||||
Resource required) {
|
||||
boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application,
|
||||
Priority priority, Resource required) {
|
||||
int requiredContainers = application.getTotalRequiredResources(priority);
|
||||
int reservedContainers = application.getNumReservedContainers(priority);
|
||||
int starvation = 0;
|
||||
|
@ -1240,7 +1150,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
private CSAssignment assignContainersOnNode(Resource clusterResource,
|
||||
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
|
||||
RMContainer reservedContainer, boolean needToUnreserve) {
|
||||
RMContainer reservedContainer) {
|
||||
Resource assigned = Resources.none();
|
||||
|
||||
NodeType requestType = null;
|
||||
|
@ -1252,7 +1162,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
requestType = NodeType.NODE_LOCAL;
|
||||
assigned =
|
||||
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
|
||||
node, application, priority, reservedContainer, needToUnreserve,
|
||||
node, application, priority, reservedContainer,
|
||||
allocatedContainer);
|
||||
if (Resources.greaterThan(resourceCalculator, clusterResource,
|
||||
assigned, Resources.none())) {
|
||||
|
@ -1280,7 +1190,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
assigned =
|
||||
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
|
||||
node, application, priority, reservedContainer, needToUnreserve,
|
||||
node, application, priority, reservedContainer,
|
||||
allocatedContainer);
|
||||
if (Resources.greaterThan(resourceCalculator, clusterResource,
|
||||
assigned, Resources.none())) {
|
||||
|
@ -1308,7 +1218,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
assigned =
|
||||
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
|
||||
node, application, priority, reservedContainer, needToUnreserve,
|
||||
node, application, priority, reservedContainer,
|
||||
allocatedContainer);
|
||||
|
||||
// update locality statistics
|
||||
|
@ -1320,13 +1230,24 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
return SKIP_ASSIGNMENT;
|
||||
}
|
||||
|
||||
private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
|
||||
// First we need to get minimum resource we need unreserve
|
||||
// minimum-resource-need-unreserve = used + asked - limit
|
||||
Resource minimumUnreservedResource =
|
||||
Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource),
|
||||
currentResourceLimits.getLimit());
|
||||
return minimumUnreservedResource;
|
||||
}
|
||||
|
||||
@Private
|
||||
protected boolean findNodeToUnreserve(Resource clusterResource,
|
||||
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
|
||||
Resource capability) {
|
||||
Resource askedResource, Resource minimumUnreservedResource) {
|
||||
// need to unreserve some other container first
|
||||
NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability);
|
||||
NodeId idToUnreserve =
|
||||
application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
|
||||
resourceCalculator, clusterResource);
|
||||
if (idToUnreserve == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("checked to see if could unreserve for app but nothing "
|
||||
|
@ -1343,7 +1264,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
LOG.debug("unreserving for app: " + application.getApplicationId()
|
||||
+ " on nodeId: " + idToUnreserve
|
||||
+ " in order to replace reserved application and place it on node: "
|
||||
+ node.getNodeID() + " needing: " + capability);
|
||||
+ node.getNodeID() + " needing: " + askedResource);
|
||||
}
|
||||
|
||||
// headroom
|
||||
|
@ -1364,15 +1285,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
@Private
|
||||
protected boolean checkLimitsToReserve(Resource clusterResource,
|
||||
FiCaSchedulerApp application, Resource capability,
|
||||
boolean needToUnreserve) {
|
||||
if (needToUnreserve) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("we needed to unreserve to be able to allocate");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
FiCaSchedulerApp application, Resource capability) {
|
||||
// we can't reserve if we got here based on the limit
|
||||
// checks assuming we could unreserve!!!
|
||||
Resource userLimit = computeUserLimitAndSetHeadroom(application,
|
||||
|
@ -1380,7 +1293,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
// Check queue max-capacity limit,
|
||||
// TODO: Consider reservation on labels
|
||||
if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) {
|
||||
if (!canAssignToThisQueue(clusterResource, null,
|
||||
this.currentResourceLimits, capability, Resources.none())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("was going to reserve but hit queue limit");
|
||||
}
|
||||
|
@ -1402,43 +1316,40 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
private Resource assignNodeLocalContainers(Resource clusterResource,
|
||||
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
|
||||
FiCaSchedulerApp application, Priority priority,
|
||||
RMContainer reservedContainer, boolean needToUnreserve,
|
||||
MutableObject allocatedContainer) {
|
||||
RMContainer reservedContainer, MutableObject allocatedContainer) {
|
||||
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
|
||||
reservedContainer)) {
|
||||
return assignContainer(clusterResource, node, application, priority,
|
||||
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
|
||||
needToUnreserve, allocatedContainer);
|
||||
allocatedContainer);
|
||||
}
|
||||
|
||||
return Resources.none();
|
||||
}
|
||||
|
||||
private Resource assignRackLocalContainers(
|
||||
Resource clusterResource, ResourceRequest rackLocalResourceRequest,
|
||||
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
|
||||
RMContainer reservedContainer, boolean needToUnreserve,
|
||||
MutableObject allocatedContainer) {
|
||||
private Resource assignRackLocalContainers(Resource clusterResource,
|
||||
ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
|
||||
FiCaSchedulerApp application, Priority priority,
|
||||
RMContainer reservedContainer, MutableObject allocatedContainer) {
|
||||
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
|
||||
reservedContainer)) {
|
||||
return assignContainer(clusterResource, node, application, priority,
|
||||
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
|
||||
needToUnreserve, allocatedContainer);
|
||||
allocatedContainer);
|
||||
}
|
||||
|
||||
return Resources.none();
|
||||
}
|
||||
|
||||
private Resource assignOffSwitchContainers(
|
||||
Resource clusterResource, ResourceRequest offSwitchResourceRequest,
|
||||
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
|
||||
RMContainer reservedContainer, boolean needToUnreserve,
|
||||
MutableObject allocatedContainer) {
|
||||
private Resource assignOffSwitchContainers(Resource clusterResource,
|
||||
ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
|
||||
FiCaSchedulerApp application, Priority priority,
|
||||
RMContainer reservedContainer, MutableObject allocatedContainer) {
|
||||
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
|
||||
reservedContainer)) {
|
||||
return assignContainer(clusterResource, node, application, priority,
|
||||
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
|
||||
needToUnreserve, allocatedContainer);
|
||||
allocatedContainer);
|
||||
}
|
||||
|
||||
return Resources.none();
|
||||
|
@ -1522,13 +1433,12 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
|
||||
FiCaSchedulerApp application, Priority priority,
|
||||
ResourceRequest request, NodeType type, RMContainer rmContainer,
|
||||
boolean needToUnreserve, MutableObject createdContainer) {
|
||||
MutableObject createdContainer) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("assignContainers: node=" + node.getNodeName()
|
||||
+ " application=" + application.getApplicationId()
|
||||
+ " priority=" + priority.getPriority()
|
||||
+ " request=" + request + " type=" + type
|
||||
+ " needToUnreserve= " + needToUnreserve);
|
||||
+ " request=" + request + " type=" + type);
|
||||
}
|
||||
|
||||
// check if the resource request can access the label
|
||||
|
@ -1548,12 +1458,14 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
Resource available = node.getAvailableResource();
|
||||
Resource totalResource = node.getTotalResource();
|
||||
|
||||
if (!Resources.fitsIn(capability, totalResource)) {
|
||||
if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource,
|
||||
capability, totalResource)) {
|
||||
LOG.warn("Node : " + node.getNodeID()
|
||||
+ " does not have sufficient resource for request : " + request
|
||||
+ " node total capability : " + node.getTotalResource());
|
||||
return Resources.none();
|
||||
}
|
||||
|
||||
assert Resources.greaterThan(
|
||||
resourceCalculator, clusterResource, available, Resources.none());
|
||||
|
||||
|
@ -1566,18 +1478,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
LOG.warn("Couldn't get container for allocation!");
|
||||
return Resources.none();
|
||||
}
|
||||
|
||||
// default to true since if reservation continue look feature isn't on
|
||||
// needContainers is checked earlier and we wouldn't have gotten this far
|
||||
boolean canAllocContainer = true;
|
||||
if (this.reservationsContinueLooking) {
|
||||
// based on reservations can we allocate/reserve more or do we need
|
||||
// to unreserve one first
|
||||
canAllocContainer = needContainers(application, priority, capability);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("can alloc container is: " + canAllocContainer);
|
||||
}
|
||||
}
|
||||
|
||||
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
|
||||
application, priority, capability);
|
||||
|
||||
// Can we allocate a container on this node?
|
||||
int availableContainers =
|
||||
|
@ -1588,25 +1491,25 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
// Did we previously reserve containers at this 'priority'?
|
||||
if (rmContainer != null) {
|
||||
unreserve(application, priority, node, rmContainer);
|
||||
} else if (this.reservationsContinueLooking
|
||||
&& (!canAllocContainer || needToUnreserve)) {
|
||||
// need to unreserve some other container first
|
||||
boolean res = findNodeToUnreserve(clusterResource, node, application,
|
||||
priority, capability);
|
||||
if (!res) {
|
||||
return Resources.none();
|
||||
}
|
||||
} else {
|
||||
// we got here by possibly ignoring queue capacity limits. If the
|
||||
// parameter needToUnreserve is true it means we ignored one of those
|
||||
// limits in the chance we could unreserve. If we are here we aren't
|
||||
// trying to unreserve so we can't allocate anymore due to that parent
|
||||
// limit.
|
||||
if (needToUnreserve) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("we needed to unreserve to be able to allocate, skipping");
|
||||
} else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
|
||||
// when reservationsContinueLooking is set, we may need to unreserve
|
||||
// some containers to meet this queue and its parents' resource limits
|
||||
// TODO, need change here when we want to support continuous reservation
|
||||
// looking for labeled partitions.
|
||||
Resource minimumUnreservedResource =
|
||||
getMinimumResourceNeedUnreserved(capability);
|
||||
if (!shouldAllocOrReserveNewContainer
|
||||
|| Resources.greaterThan(resourceCalculator, clusterResource,
|
||||
minimumUnreservedResource, Resources.none())) {
|
||||
boolean containerUnreserved =
|
||||
findNodeToUnreserve(clusterResource, node, application, priority,
|
||||
capability, minimumUnreservedResource);
|
||||
// When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
|
||||
// container (That means we *have to* unreserve some resource to
|
||||
// continue)). If we failed to unreserve some resource,
|
||||
if (!containerUnreserved) {
|
||||
return Resources.none();
|
||||
}
|
||||
return Resources.none();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1632,17 +1535,16 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
} else {
|
||||
// if we are allowed to allocate but this node doesn't have space, reserve it or
|
||||
// if this was an already a reserved container, reserve it again
|
||||
if ((canAllocContainer) || (rmContainer != null)) {
|
||||
if (shouldAllocOrReserveNewContainer || rmContainer != null) {
|
||||
|
||||
if (reservationsContinueLooking) {
|
||||
// we got here by possibly ignoring parent queue capacity limits. If
|
||||
// the parameter needToUnreserve is true it means we ignored one of
|
||||
// those limits in the chance we could unreserve. If we are here
|
||||
// we aren't trying to unreserve so we can't allocate
|
||||
// anymore due to that parent limit
|
||||
boolean res = checkLimitsToReserve(clusterResource, application, capability,
|
||||
needToUnreserve);
|
||||
if (!res) {
|
||||
if (reservationsContinueLooking && rmContainer == null) {
|
||||
// we could possibly ignoring parent queue capacity limits when
|
||||
// reservationsContinueLooking is set.
|
||||
// If we're trying to reserve a container here, not container will be
|
||||
// unreserved for reserving the new one. Check limits again before
|
||||
// reserve the new container
|
||||
if (!checkLimitsToReserve(clusterResource,
|
||||
application, capability)) {
|
||||
return Resources.none();
|
||||
}
|
||||
}
|
||||
|
@ -1784,18 +1686,36 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
|
||||
queueCapacities.getAbsoluteCapacity(), minimumAllocation);
|
||||
}
|
||||
|
||||
private void updateCurrentResourceLimits(
|
||||
ResourceLimits currentResourceLimits, Resource clusterResource) {
|
||||
// TODO: need consider non-empty node labels when resource limits supports
|
||||
// node labels
|
||||
// Even if ParentQueue will set limits respect child's max queue capacity,
|
||||
// but when allocating reserved container, CapacityScheduler doesn't do
|
||||
// this. So need cap limits by queue's max capacity here.
|
||||
this.currentResourceLimits = currentResourceLimits;
|
||||
Resource queueMaxResource =
|
||||
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
|
||||
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
|
||||
queueCapacities
|
||||
.getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
|
||||
minimumAllocation);
|
||||
this.currentResourceLimits.setLimit(Resources.min(resourceCalculator,
|
||||
clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void updateClusterResource(Resource clusterResource,
|
||||
ResourceLimits currentResourceLimits) {
|
||||
this.currentResourceLimits = currentResourceLimits;
|
||||
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
|
||||
lastClusterResource = clusterResource;
|
||||
updateAbsoluteCapacityResource(clusterResource);
|
||||
|
||||
// Update headroom info based on new cluster resource value
|
||||
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
|
||||
// during allocation
|
||||
computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
|
||||
setQueueResourceLimitsInfo(clusterResource);
|
||||
|
||||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.security.AccessType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
@ -63,8 +61,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
@Private
|
||||
@Evolving
|
||||
public class ParentQueue extends AbstractCSQueue {
|
||||
|
@ -380,8 +376,7 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
|
||||
@Override
|
||||
public synchronized CSAssignment assignContainers(Resource clusterResource,
|
||||
FiCaSchedulerNode node, boolean needToUnreserve,
|
||||
ResourceLimits resourceLimits) {
|
||||
FiCaSchedulerNode node, ResourceLimits resourceLimits) {
|
||||
CSAssignment assignment =
|
||||
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
|
||||
Set<String> nodeLabels = node.getLabels();
|
||||
|
@ -397,21 +392,18 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
+ getQueueName());
|
||||
}
|
||||
|
||||
boolean localNeedToUnreserve = false;
|
||||
|
||||
// Are we over maximum-capacity for this queue?
|
||||
if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
|
||||
// check to see if we could if we unreserve first
|
||||
localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
|
||||
if (!localNeedToUnreserve) {
|
||||
break;
|
||||
}
|
||||
// This will also consider parent's limits and also continuous reservation
|
||||
// looking
|
||||
if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
|
||||
minimumAllocation, Resources.createResource(getMetrics()
|
||||
.getReservedMB(), getMetrics().getReservedVirtualCores()))) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Schedule
|
||||
CSAssignment assignedToChild =
|
||||
assignContainersToChildQueues(clusterResource, node,
|
||||
localNeedToUnreserve | needToUnreserve, resourceLimits);
|
||||
assignContainersToChildQueues(clusterResource, node, resourceLimits);
|
||||
assignment.setType(assignedToChild.getType());
|
||||
|
||||
// Done if no child-queue assigned anything
|
||||
|
@ -459,74 +451,6 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
return assignment;
|
||||
}
|
||||
|
||||
private synchronized boolean canAssignToThisQueue(Resource clusterResource,
|
||||
Set<String> nodeLabels) {
|
||||
Set<String> labelCanAccess =
|
||||
new HashSet<String>(
|
||||
accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
|
||||
: Sets.intersection(accessibleLabels, nodeLabels));
|
||||
if (nodeLabels.isEmpty()) {
|
||||
// Any queue can always access any node without label
|
||||
labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
|
||||
}
|
||||
|
||||
boolean canAssign = true;
|
||||
for (String label : labelCanAccess) {
|
||||
float currentAbsoluteLabelUsedCapacity =
|
||||
Resources.divide(resourceCalculator, clusterResource,
|
||||
queueUsage.getUsed(label),
|
||||
labelManager.getResourceByLabel(label, clusterResource));
|
||||
// if any of the label doesn't beyond limit, we can allocate on this node
|
||||
if (currentAbsoluteLabelUsedCapacity >=
|
||||
queueCapacities.getAbsoluteMaximumCapacity(label)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getQueueName() + " used=" + queueUsage.getUsed()
|
||||
+ " current-capacity (" + queueUsage.getUsed(label) + ") "
|
||||
+ " >= max-capacity ("
|
||||
+ labelManager.getResourceByLabel(label, clusterResource) + ")");
|
||||
}
|
||||
canAssign = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return canAssign;
|
||||
}
|
||||
|
||||
|
||||
private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) {
|
||||
if (this.reservationsContinueLooking) {
|
||||
// check to see if we could potentially use this node instead of a reserved
|
||||
// node
|
||||
|
||||
Resource reservedResources = Resources.createResource(getMetrics()
|
||||
.getReservedMB(), getMetrics().getReservedVirtualCores());
|
||||
float capacityWithoutReservedCapacity = Resources.divide(
|
||||
resourceCalculator, clusterResource,
|
||||
Resources.subtract(queueUsage.getUsed(), reservedResources),
|
||||
clusterResource);
|
||||
|
||||
if (capacityWithoutReservedCapacity <= queueCapacities
|
||||
.getAbsoluteMaximumCapacity()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("parent: try to use reserved: " + getQueueName()
|
||||
+ " usedResources: " + queueUsage.getUsed().getMemory()
|
||||
+ " clusterResources: " + clusterResource.getMemory()
|
||||
+ " reservedResources: " + reservedResources.getMemory()
|
||||
+ " currentCapacity " + ((float) queueUsage.getUsed().getMemory())
|
||||
/ clusterResource.getMemory()
|
||||
+ " potentialNewWithoutReservedCapacity: "
|
||||
+ capacityWithoutReservedCapacity + " ( " + " max-capacity: "
|
||||
+ queueCapacities.getAbsoluteMaximumCapacity() + ")");
|
||||
}
|
||||
// we could potentially use this node instead of reserved node
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
|
||||
return (node.getReservedContainer() == null) &&
|
||||
Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
|
||||
|
@ -534,28 +458,38 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
}
|
||||
|
||||
private ResourceLimits getResourceLimitsOfChild(CSQueue child,
|
||||
Resource clusterResource, ResourceLimits myLimits) {
|
||||
/*
|
||||
* Set head-room of a given child, limit =
|
||||
* min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used
|
||||
* + child.used. To avoid any of this queue's and its ancestors' limit
|
||||
* being violated
|
||||
*/
|
||||
Resource myCurrentLimit =
|
||||
getCurrentResourceLimit(clusterResource, myLimits);
|
||||
// My available resource = my-current-limit - my-used-resource
|
||||
Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit,
|
||||
getUsedResources());
|
||||
// Child's limit = my-available-resource + resource-already-used-by-child
|
||||
Resource clusterResource, ResourceLimits parentLimits) {
|
||||
// Set resource-limit of a given child, child.limit =
|
||||
// min(my.limit - my.used + child.used, child.max)
|
||||
|
||||
// Parent available resource = parent-limit - parent-used-resource
|
||||
Resource parentMaxAvailableResource =
|
||||
Resources.subtract(parentLimits.getLimit(), getUsedResources());
|
||||
|
||||
// Child's limit = parent-available-resource + child-used
|
||||
Resource childLimit =
|
||||
Resources.add(myMaxAvailableResource, child.getUsedResources());
|
||||
|
||||
Resources.add(parentMaxAvailableResource, child.getUsedResources());
|
||||
|
||||
// Get child's max resource
|
||||
Resource childConfiguredMaxResource =
|
||||
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
|
||||
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
|
||||
child.getAbsoluteMaximumCapacity(), minimumAllocation);
|
||||
|
||||
// Child's limit should be capped by child configured max resource
|
||||
childLimit =
|
||||
Resources.min(resourceCalculator, clusterResource, childLimit,
|
||||
childConfiguredMaxResource);
|
||||
|
||||
// Normalize before return
|
||||
childLimit =
|
||||
Resources.roundDown(resourceCalculator, childLimit, minimumAllocation);
|
||||
|
||||
return new ResourceLimits(childLimit);
|
||||
}
|
||||
|
||||
private synchronized CSAssignment assignContainersToChildQueues(
|
||||
Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve,
|
||||
ResourceLimits limits) {
|
||||
Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) {
|
||||
CSAssignment assignment =
|
||||
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
|
||||
|
||||
|
@ -573,9 +507,7 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
ResourceLimits childLimits =
|
||||
getResourceLimitsOfChild(childQueue, cluster, limits);
|
||||
|
||||
assignment =
|
||||
childQueue.assignContainers(cluster, node, needToUnreserve,
|
||||
childLimits);
|
||||
assignment = childQueue.assignContainers(cluster, node, childLimits);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
|
||||
" stats: " + childQueue + " --> " +
|
||||
|
|
|
@ -274,7 +274,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
}
|
||||
|
||||
synchronized public NodeId getNodeIdToUnreserve(Priority priority,
|
||||
Resource capability) {
|
||||
Resource resourceNeedUnreserve, ResourceCalculator rc,
|
||||
Resource clusterResource) {
|
||||
|
||||
// first go around make this algorithm simple and just grab first
|
||||
// reservation that has enough resources
|
||||
|
@ -283,16 +284,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
|
||||
if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
|
||||
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
|
||||
NodeId nodeId = entry.getKey();
|
||||
Resource containerResource = entry.getValue().getContainer().getResource();
|
||||
|
||||
// make sure we unreserve one with at least the same amount of
|
||||
// resources, otherwise could affect capacity limits
|
||||
if (Resources.fitsIn(capability, entry.getValue().getContainer()
|
||||
.getResource())) {
|
||||
if (Resources.lessThanOrEqual(rc, clusterResource,
|
||||
resourceNeedUnreserve, containerResource)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("unreserving node with reservation size: "
|
||||
+ entry.getValue().getContainer().getResource()
|
||||
+ " in order to allocate container with size: " + capability);
|
||||
+ containerResource
|
||||
+ " in order to allocate container with size: " + resourceNeedUnreserve);
|
||||
}
|
||||
return entry.getKey();
|
||||
return nodeId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -611,7 +611,7 @@ public class TestApplicationLimits {
|
|||
app_0_0.updateResourceRequests(app_0_0_requests);
|
||||
|
||||
// Schedule to compute
|
||||
queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
|
||||
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource));
|
||||
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
|
||||
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
|
||||
|
@ -631,7 +631,7 @@ public class TestApplicationLimits {
|
|||
app_0_1.updateResourceRequests(app_0_1_requests);
|
||||
|
||||
// Schedule to compute
|
||||
queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
|
||||
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource)); // Schedule to compute
|
||||
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
|
||||
assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
|
||||
|
@ -651,7 +651,7 @@ public class TestApplicationLimits {
|
|||
app_1_0.updateResourceRequests(app_1_0_requests);
|
||||
|
||||
// Schedule to compute
|
||||
queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
|
||||
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource)); // Schedule to compute
|
||||
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
|
||||
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
|
||||
|
@ -660,7 +660,7 @@ public class TestApplicationLimits {
|
|||
|
||||
// Now reduce cluster size and check for the smaller headroom
|
||||
clusterResource = Resources.createResource(90*16*GB);
|
||||
queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
|
||||
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource)); // Schedule to compute
|
||||
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
|
||||
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
|
||||
|
|
|
@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.ComparisonFailure;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
|
@ -2483,6 +2484,64 @@ public class TestCapacityScheduler {
|
|||
Assert.assertEquals(30 * GB,
|
||||
am1.doHeartbeat().getAvailableResources().getMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParentQueueMaxCapsAreRespected() throws Exception {
|
||||
/*
|
||||
* Queue tree:
|
||||
* Root
|
||||
* / \
|
||||
* A B
|
||||
* / \
|
||||
* A1 A2
|
||||
*/
|
||||
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
|
||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
|
||||
csConf.setCapacity(A, 50);
|
||||
csConf.setMaximumCapacity(A, 50);
|
||||
csConf.setCapacity(B, 50);
|
||||
|
||||
// Define 2nd-level queues
|
||||
csConf.setQueues(A, new String[] {"a1", "a2"});
|
||||
csConf.setCapacity(A1, 50);
|
||||
csConf.setUserLimitFactor(A1, 100.0f);
|
||||
csConf.setCapacity(A2, 50);
|
||||
csConf.setUserLimitFactor(A2, 100.0f);
|
||||
csConf.setCapacity(B1, B1_CAPACITY);
|
||||
csConf.setUserLimitFactor(B1, 100.0f);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
// Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB
|
||||
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1);
|
||||
|
||||
// Try to launch app2 in a2, asked 2GB, should success
|
||||
RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "a2");
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
||||
try {
|
||||
// Try to allocate a container, a's usage=11G/max=12
|
||||
// a1's usage=9G/max=12
|
||||
// a2's usage=2G/max=12
|
||||
// In this case, if a2 asked 2G, should fail.
|
||||
waitContainerAllocated(am2, 2 * GB, 1, 2, rm1, nm1);
|
||||
} catch (AssertionError failure) {
|
||||
// Expected, return;
|
||||
return;
|
||||
}
|
||||
Assert.fail("Shouldn't successfully allocate containers for am2, "
|
||||
+ "queue-a's max capacity will be violated if container allocated");
|
||||
}
|
||||
|
||||
private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
|
@ -145,7 +144,7 @@ public class TestChildQueueOrder {
|
|||
if (allocation > 0) {
|
||||
doReturn(new CSAssignment(Resources.none(), type)).
|
||||
when(queue)
|
||||
.assignContainers(eq(clusterResource), eq(node), anyBoolean(),
|
||||
.assignContainers(eq(clusterResource), eq(node),
|
||||
any(ResourceLimits.class));
|
||||
|
||||
// Mock the node's resource availability
|
||||
|
@ -157,7 +156,7 @@ public class TestChildQueueOrder {
|
|||
return new CSAssignment(allocatedResource, type);
|
||||
}
|
||||
}).
|
||||
when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(),
|
||||
when(queue).assignContainers(eq(clusterResource), eq(node),
|
||||
any(ResourceLimits.class));
|
||||
doNothing().when(node).releaseContainer(any(Container.class));
|
||||
}
|
||||
|
@ -274,7 +273,7 @@ public class TestChildQueueOrder {
|
|||
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
|
||||
root.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource));
|
||||
for(int i=0; i < 2; i++)
|
||||
{
|
||||
|
@ -282,7 +281,7 @@ public class TestChildQueueOrder {
|
|||
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
|
||||
root.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource));
|
||||
}
|
||||
for(int i=0; i < 3; i++)
|
||||
|
@ -291,7 +290,7 @@ public class TestChildQueueOrder {
|
|||
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
|
||||
root.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource));
|
||||
}
|
||||
for(int i=0; i < 4; i++)
|
||||
|
@ -300,7 +299,7 @@ public class TestChildQueueOrder {
|
|||
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
|
||||
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
|
||||
root.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource));
|
||||
}
|
||||
verifyQueueMetrics(a, 1*GB, clusterResource);
|
||||
|
@ -334,7 +333,7 @@ public class TestChildQueueOrder {
|
|||
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
|
||||
root.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource));
|
||||
}
|
||||
verifyQueueMetrics(a, 3*GB, clusterResource);
|
||||
|
@ -362,7 +361,7 @@ public class TestChildQueueOrder {
|
|||
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
|
||||
root.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource));
|
||||
verifyQueueMetrics(a, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 3*GB, clusterResource);
|
||||
|
@ -389,7 +388,7 @@ public class TestChildQueueOrder {
|
|||
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
|
||||
root.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource));
|
||||
verifyQueueMetrics(a, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||
|
@ -404,13 +403,13 @@ public class TestChildQueueOrder {
|
|||
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
|
||||
root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
|
||||
root.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource));
|
||||
InOrder allocationOrder = inOrder(d,b);
|
||||
allocationOrder.verify(d).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
|
||||
any(FiCaSchedulerNode.class), any(ResourceLimits.class));
|
||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
|
||||
any(FiCaSchedulerNode.class), any(ResourceLimits.class));
|
||||
verifyQueueMetrics(a, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(c, 3*GB, clusterResource);
|
||||
|
|
|
@ -350,8 +350,8 @@ public class TestLeafQueue {
|
|||
// Start testing...
|
||||
|
||||
// Only 1 container
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
new ResourceLimits(clusterResource));
|
||||
a.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource));
|
||||
assertEquals(
|
||||
(int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
|
||||
a.getMetrics().getAvailableMB());
|
||||
|
@ -486,7 +486,7 @@ public class TestLeafQueue {
|
|||
// Start testing...
|
||||
|
||||
// Only 1 container
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(1*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -497,7 +497,7 @@ public class TestLeafQueue {
|
|||
|
||||
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
||||
// you can get one container more than user-limit
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -506,7 +506,7 @@ public class TestLeafQueue {
|
|||
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
|
||||
|
||||
// Can't allocate 3rd due to user-limit
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -516,7 +516,7 @@ public class TestLeafQueue {
|
|||
|
||||
// Bump up user-limit-factor, now allocate should work
|
||||
a.setUserLimitFactor(10);
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(3*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -525,7 +525,7 @@ public class TestLeafQueue {
|
|||
assertEquals(3*GB, a.getMetrics().getAllocatedMB());
|
||||
|
||||
// One more should work, for app_1, due to user-limit-factor
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(4*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -536,8 +536,8 @@ public class TestLeafQueue {
|
|||
// Test max-capacity
|
||||
// Now - no more allocs since we are at max-cap
|
||||
a.setMaxCapacity(0.5f);
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
new ResourceLimits(clusterResource));
|
||||
a.assignContainers(clusterResource, node_0, new ResourceLimits(
|
||||
clusterResource));
|
||||
assertEquals(4*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||
|
@ -652,21 +652,21 @@ public class TestLeafQueue {
|
|||
// recordFactory)));
|
||||
|
||||
// 1 container to user_0
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||
|
||||
// Again one to user_0 since he hasn't exceeded user limit yet
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(3*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||
|
||||
// One more to user_0 since he is the only active user
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(4*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -718,7 +718,7 @@ public class TestLeafQueue {
|
|||
assertEquals("There should only be 1 active user!",
|
||||
1, qb.getActiveUsersManager().getNumActiveUsers());
|
||||
//get headroom
|
||||
qb.assignContainers(clusterResource, node_0, false,
|
||||
qb.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
|
||||
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
|
||||
|
@ -738,7 +738,7 @@ public class TestLeafQueue {
|
|||
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
|
||||
u1Priority, recordFactory)));
|
||||
qb.submitApplicationAttempt(app_2, user_1);
|
||||
qb.assignContainers(clusterResource, node_1, false,
|
||||
qb.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
|
||||
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
|
||||
|
@ -781,9 +781,9 @@ public class TestLeafQueue {
|
|||
u1Priority, recordFactory)));
|
||||
qb.submitApplicationAttempt(app_1, user_0);
|
||||
qb.submitApplicationAttempt(app_3, user_1);
|
||||
qb.assignContainers(clusterResource, node_0, false,
|
||||
qb.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
qb.assignContainers(clusterResource, node_0, false,
|
||||
qb.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
|
||||
.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
|
||||
|
@ -802,7 +802,7 @@ public class TestLeafQueue {
|
|||
app_4.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
|
||||
u0Priority, recordFactory)));
|
||||
qb.assignContainers(clusterResource, node_1, false,
|
||||
qb.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4
|
||||
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
|
||||
|
@ -875,7 +875,7 @@ public class TestLeafQueue {
|
|||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
|
||||
priority, recordFactory)));
|
||||
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(1*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -892,7 +892,7 @@ public class TestLeafQueue {
|
|||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||
priority, recordFactory)));
|
||||
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -981,7 +981,7 @@ public class TestLeafQueue {
|
|||
1, a.getActiveUsersManager().getNumActiveUsers());
|
||||
|
||||
// 1 container to user_0
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -992,7 +992,7 @@ public class TestLeafQueue {
|
|||
// the application is not yet active
|
||||
|
||||
// Again one to user_0 since he hasn't exceeded user limit yet
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(3*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1009,7 +1009,7 @@ public class TestLeafQueue {
|
|||
|
||||
// No more to user_0 since he is already over user-limit
|
||||
// and no more containers to queue since it's already at max-cap
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(3*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1023,7 +1023,7 @@ public class TestLeafQueue {
|
|||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
|
||||
priority, recordFactory)));
|
||||
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
|
||||
}
|
||||
|
@ -1094,7 +1094,7 @@ public class TestLeafQueue {
|
|||
*/
|
||||
|
||||
// Only 1 container
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(1*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1102,7 +1102,7 @@ public class TestLeafQueue {
|
|||
|
||||
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
||||
// you can get one container more than user-limit
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1110,7 +1110,7 @@ public class TestLeafQueue {
|
|||
|
||||
// Can't allocate 3rd due to user-limit
|
||||
a.setUserLimit(25);
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1129,7 +1129,7 @@ public class TestLeafQueue {
|
|||
// Now allocations should goto app_2 since
|
||||
// user_0 is at limit inspite of high user-limit-factor
|
||||
a.setUserLimitFactor(10);
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(5*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1139,7 +1139,7 @@ public class TestLeafQueue {
|
|||
|
||||
// Now allocations should goto app_0 since
|
||||
// user_0 is at user-limit not above it
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(6*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1150,7 +1150,7 @@ public class TestLeafQueue {
|
|||
// Test max-capacity
|
||||
// Now - no more allocs since we are at max-cap
|
||||
a.setMaxCapacity(0.5f);
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(6*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1162,7 +1162,7 @@ public class TestLeafQueue {
|
|||
// Now, allocations should goto app_3 since it's under user-limit
|
||||
a.setMaxCapacity(1.0f);
|
||||
a.setUserLimitFactor(1);
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(7*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1171,7 +1171,7 @@ public class TestLeafQueue {
|
|||
assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
|
||||
|
||||
// Now we should assign to app_3 again since user_2 is under user-limit
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(8*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1271,7 +1271,7 @@ public class TestLeafQueue {
|
|||
// Start testing...
|
||||
|
||||
// Only 1 container
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(1*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1282,7 +1282,7 @@ public class TestLeafQueue {
|
|||
|
||||
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
||||
// you can get one container more than user-limit
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1291,7 +1291,7 @@ public class TestLeafQueue {
|
|||
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
|
||||
|
||||
// Now, reservation should kick in for app_1
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(6*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1308,7 +1308,7 @@ public class TestLeafQueue {
|
|||
ContainerState.COMPLETE, "",
|
||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||
RMContainerEventType.KILL, null, true);
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(5*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1325,7 +1325,7 @@ public class TestLeafQueue {
|
|||
ContainerState.COMPLETE, "",
|
||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||
RMContainerEventType.KILL, null, true);
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(4*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1393,7 +1393,7 @@ public class TestLeafQueue {
|
|||
|
||||
// Start testing...
|
||||
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1403,7 +1403,7 @@ public class TestLeafQueue {
|
|||
assertEquals(0*GB, a.getMetrics().getAvailableMB());
|
||||
|
||||
// Now, reservation should kick in for app_1
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(6*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1417,7 +1417,7 @@ public class TestLeafQueue {
|
|||
// We do not need locality delay here
|
||||
doReturn(-1).when(a).getNodeLocalityDelay();
|
||||
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(10*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1434,7 +1434,7 @@ public class TestLeafQueue {
|
|||
ContainerState.COMPLETE, "",
|
||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||
RMContainerEventType.KILL, null, true);
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(8*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1503,7 +1503,7 @@ public class TestLeafQueue {
|
|||
// Start testing...
|
||||
|
||||
// Only 1 container
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(1*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1511,14 +1511,14 @@ public class TestLeafQueue {
|
|||
|
||||
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
|
||||
// you can get one container more than user-limit
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||
|
||||
// Now, reservation should kick in for app_1
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(6*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1533,7 +1533,7 @@ public class TestLeafQueue {
|
|||
ContainerState.COMPLETE, "",
|
||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||
RMContainerEventType.KILL, null, true);
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(5*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1543,7 +1543,7 @@ public class TestLeafQueue {
|
|||
assertEquals(1, app_1.getReReservations(priority));
|
||||
|
||||
// Re-reserve
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(5*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1553,7 +1553,7 @@ public class TestLeafQueue {
|
|||
assertEquals(2, app_1.getReReservations(priority));
|
||||
|
||||
// Try to schedule on node_1 now, should *move* the reservation
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(9*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1571,7 +1571,7 @@ public class TestLeafQueue {
|
|||
ContainerState.COMPLETE, "",
|
||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||
RMContainerEventType.KILL, null, true);
|
||||
CSAssignment assignment = a.assignContainers(clusterResource, node_0, false,
|
||||
CSAssignment assignment = a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(8*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1643,7 +1643,7 @@ public class TestLeafQueue {
|
|||
CSAssignment assignment = null;
|
||||
|
||||
// Start with off switch, shouldn't allocate due to delay scheduling
|
||||
assignment = a.assignContainers(clusterResource, node_2, false,
|
||||
assignment = a.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1652,7 +1652,7 @@ public class TestLeafQueue {
|
|||
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
|
||||
|
||||
// Another off switch, shouldn't allocate due to delay scheduling
|
||||
assignment = a.assignContainers(clusterResource, node_2, false,
|
||||
assignment = a.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1661,7 +1661,7 @@ public class TestLeafQueue {
|
|||
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
|
||||
|
||||
// Another off switch, shouldn't allocate due to delay scheduling
|
||||
assignment = a.assignContainers(clusterResource, node_2, false,
|
||||
assignment = a.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1671,7 +1671,7 @@ public class TestLeafQueue {
|
|||
|
||||
// Another off switch, now we should allocate
|
||||
// since missedOpportunities=3 and reqdContainers=3
|
||||
assignment = a.assignContainers(clusterResource, node_2, false,
|
||||
assignment = a.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1680,7 +1680,7 @@ public class TestLeafQueue {
|
|||
assertEquals(NodeType.OFF_SWITCH, assignment.getType());
|
||||
|
||||
// NODE_LOCAL - node_0
|
||||
assignment = a.assignContainers(clusterResource, node_0, false,
|
||||
assignment = a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1689,7 +1689,7 @@ public class TestLeafQueue {
|
|||
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
|
||||
|
||||
// NODE_LOCAL - node_1
|
||||
assignment = a.assignContainers(clusterResource, node_1, false,
|
||||
assignment = a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1718,14 +1718,14 @@ public class TestLeafQueue {
|
|||
doReturn(1).when(a).getNodeLocalityDelay();
|
||||
|
||||
// Shouldn't assign RACK_LOCAL yet
|
||||
assignment = a.assignContainers(clusterResource, node_3, false,
|
||||
assignment = a.assignContainers(clusterResource, node_3,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(1, app_0.getSchedulingOpportunities(priority));
|
||||
assertEquals(2, app_0.getTotalRequiredResources(priority));
|
||||
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
|
||||
|
||||
// Should assign RACK_LOCAL now
|
||||
assignment = a.assignContainers(clusterResource, node_3, false,
|
||||
assignment = a.assignContainers(clusterResource, node_3,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1807,7 +1807,7 @@ public class TestLeafQueue {
|
|||
|
||||
// Start with off switch, shouldn't allocate P1 due to delay scheduling
|
||||
// thus, no P2 either!
|
||||
a.assignContainers(clusterResource, node_2, false,
|
||||
a.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
||||
eq(priority_1), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1820,7 +1820,7 @@ public class TestLeafQueue {
|
|||
|
||||
// Another off-switch, shouldn't allocate P1 due to delay scheduling
|
||||
// thus, no P2 either!
|
||||
a.assignContainers(clusterResource, node_2, false,
|
||||
a.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
|
||||
eq(priority_1), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1832,7 +1832,7 @@ public class TestLeafQueue {
|
|||
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
|
||||
|
||||
// Another off-switch, shouldn't allocate OFF_SWITCH P1
|
||||
a.assignContainers(clusterResource, node_2, false,
|
||||
a.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
|
||||
eq(priority_1), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1844,7 +1844,7 @@ public class TestLeafQueue {
|
|||
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
|
||||
|
||||
// Now, DATA_LOCAL for P1
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
|
||||
eq(priority_1), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1856,7 +1856,7 @@ public class TestLeafQueue {
|
|||
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
|
||||
|
||||
// Now, OFF_SWITCH for P2
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1),
|
||||
eq(priority_1), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1933,7 +1933,7 @@ public class TestLeafQueue {
|
|||
app_0.updateResourceRequests(app_0_requests_0);
|
||||
|
||||
// NODE_LOCAL - node_0_1
|
||||
a.assignContainers(clusterResource, node_0_0, false,
|
||||
a.assignContainers(clusterResource, node_0_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1942,7 +1942,7 @@ public class TestLeafQueue {
|
|||
|
||||
// No allocation on node_1_0 even though it's node/rack local since
|
||||
// required(ANY) == 0
|
||||
a.assignContainers(clusterResource, node_1_0, false,
|
||||
a.assignContainers(clusterResource, node_1_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1959,7 +1959,7 @@ public class TestLeafQueue {
|
|||
|
||||
// No allocation on node_0_1 even though it's node/rack local since
|
||||
// required(rack_1) == 0
|
||||
a.assignContainers(clusterResource, node_0_1, false,
|
||||
a.assignContainers(clusterResource, node_0_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -1967,7 +1967,7 @@ public class TestLeafQueue {
|
|||
assertEquals(1, app_0.getTotalRequiredResources(priority));
|
||||
|
||||
// NODE_LOCAL - node_1
|
||||
a.assignContainers(clusterResource, node_1_0, false,
|
||||
a.assignContainers(clusterResource, node_1_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -2220,7 +2220,7 @@ public class TestLeafQueue {
|
|||
|
||||
// node_0_1
|
||||
// Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
|
||||
a.assignContainers(clusterResource, node_0_1, false,
|
||||
a.assignContainers(clusterResource, node_0_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -2243,7 +2243,7 @@ public class TestLeafQueue {
|
|||
|
||||
// node_1_1
|
||||
// Shouldn't allocate since RR(rack_1) = relax: false
|
||||
a.assignContainers(clusterResource, node_1_1, false,
|
||||
a.assignContainers(clusterResource, node_1_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -2274,7 +2274,7 @@ public class TestLeafQueue {
|
|||
|
||||
// node_1_1
|
||||
// Shouldn't allocate since node_1_1 is blacklisted
|
||||
a.assignContainers(clusterResource, node_1_1, false,
|
||||
a.assignContainers(clusterResource, node_1_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -2303,7 +2303,7 @@ public class TestLeafQueue {
|
|||
|
||||
// node_1_1
|
||||
// Shouldn't allocate since rack_1 is blacklisted
|
||||
a.assignContainers(clusterResource, node_1_1, false,
|
||||
a.assignContainers(clusterResource, node_1_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -2330,7 +2330,7 @@ public class TestLeafQueue {
|
|||
// Blacklist: < host_0_0 > <----
|
||||
|
||||
// Now, should allocate since RR(rack_1) = relax: true
|
||||
a.assignContainers(clusterResource, node_1_1, false,
|
||||
a.assignContainers(clusterResource, node_1_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -2361,7 +2361,7 @@ public class TestLeafQueue {
|
|||
// host_1_0: 8G
|
||||
// host_1_1: 7G
|
||||
|
||||
a.assignContainers(clusterResource, node_1_0, false,
|
||||
a.assignContainers(clusterResource, node_1_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
|
||||
any(Priority.class), any(ResourceRequest.class), any(Container.class));
|
||||
|
@ -2444,7 +2444,7 @@ public class TestLeafQueue {
|
|||
recordFactory)));
|
||||
|
||||
try {
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
} catch (NullPointerException e) {
|
||||
Assert.fail("NPE when allocating container on node but "
|
||||
|
|
|
@ -156,7 +156,7 @@ public class TestParentQueue {
|
|||
// Next call - nothing
|
||||
if (allocation > 0) {
|
||||
doReturn(new CSAssignment(Resources.none(), type)).when(queue)
|
||||
.assignContainers(eq(clusterResource), eq(node), eq(false),
|
||||
.assignContainers(eq(clusterResource), eq(node),
|
||||
any(ResourceLimits.class));
|
||||
|
||||
// Mock the node's resource availability
|
||||
|
@ -167,8 +167,7 @@ public class TestParentQueue {
|
|||
|
||||
return new CSAssignment(allocatedResource, type);
|
||||
}
|
||||
}).
|
||||
when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
||||
}).when(queue).assignContainers(eq(clusterResource), eq(node),
|
||||
any(ResourceLimits.class));
|
||||
}
|
||||
|
||||
|
@ -232,7 +231,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
// Simulate B returning a container on node_0
|
||||
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
|
||||
root.assignContainers(clusterResource, node_0, false,
|
||||
root.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
verifyQueueMetrics(a, 0*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 1*GB, clusterResource);
|
||||
|
@ -240,13 +239,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
|
||||
stubQueueAllocation(a, clusterResource, node_1, 2*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
|
||||
root.assignContainers(clusterResource, node_1, false,
|
||||
root.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
InOrder allocationOrder = inOrder(a, b);
|
||||
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
verifyQueueMetrics(a, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||
|
||||
|
@ -254,13 +253,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
// since A has 2/6G while B has 2/14G
|
||||
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 2*GB);
|
||||
root.assignContainers(clusterResource, node_0, false,
|
||||
root.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
allocationOrder = inOrder(b, a);
|
||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
verifyQueueMetrics(a, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 4*GB, clusterResource);
|
||||
|
||||
|
@ -268,13 +267,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
// since A has 3/6G while B has 4/14G
|
||||
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 4*GB);
|
||||
root.assignContainers(clusterResource, node_0, false,
|
||||
root.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
allocationOrder = inOrder(b, a);
|
||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
verifyQueueMetrics(a, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 8*GB, clusterResource);
|
||||
|
||||
|
@ -282,13 +281,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
// since A has 3/6G while B has 8/14G
|
||||
stubQueueAllocation(a, clusterResource, node_1, 1*GB);
|
||||
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
|
||||
root.assignContainers(clusterResource, node_1, false,
|
||||
root.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
allocationOrder = inOrder(a, b);
|
||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
verifyQueueMetrics(a, 4*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 9*GB, clusterResource);
|
||||
}
|
||||
|
@ -405,6 +404,22 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
|
||||
@Test
|
||||
public void testMultiLevelQueues() throws Exception {
|
||||
/*
|
||||
* Structure of queue:
|
||||
* Root
|
||||
* ____________
|
||||
* / | \ \
|
||||
* A B C D
|
||||
* / | / | \ \
|
||||
* A1 A2 B1 B2 B3 C1
|
||||
* \
|
||||
* C11
|
||||
* \
|
||||
* C111
|
||||
* \
|
||||
* C1111
|
||||
*/
|
||||
|
||||
// Setup queue configs
|
||||
setupMultiLevelQueues(csConf);
|
||||
|
||||
|
@ -449,7 +464,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||
root.assignContainers(clusterResource, node_0, false,
|
||||
root.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
verifyQueueMetrics(a, 0*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 0*GB, clusterResource);
|
||||
|
@ -462,7 +477,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
stubQueueAllocation(a, clusterResource, node_1, 0*GB);
|
||||
stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_1, 0*GB);
|
||||
root.assignContainers(clusterResource, node_1, false,
|
||||
root.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
verifyQueueMetrics(a, 0*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 4*GB, clusterResource);
|
||||
|
@ -474,15 +489,15 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
stubQueueAllocation(a1, clusterResource, node_0, 1*GB);
|
||||
stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_0, 2*GB);
|
||||
root.assignContainers(clusterResource, node_0, false,
|
||||
root.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
InOrder allocationOrder = inOrder(a, c, b);
|
||||
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(c).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
verifyQueueMetrics(a, 1*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 6*GB, clusterResource);
|
||||
verifyQueueMetrics(c, 3*GB, clusterResource);
|
||||
|
@ -501,17 +516,17 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
stubQueueAllocation(b3, clusterResource, node_2, 1*GB);
|
||||
stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
|
||||
stubQueueAllocation(c, clusterResource, node_2, 1*GB);
|
||||
root.assignContainers(clusterResource, node_2, false,
|
||||
root.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(clusterResource));
|
||||
allocationOrder = inOrder(a, a2, a1, b, c);
|
||||
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(a2).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(c).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
verifyQueueMetrics(a, 3*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 8*GB, clusterResource);
|
||||
verifyQueueMetrics(c, 4*GB, clusterResource);
|
||||
|
@ -611,7 +626,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
// Simulate B returning a container on node_0
|
||||
stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
|
||||
root.assignContainers(clusterResource, node_0, false,
|
||||
root.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
verifyQueueMetrics(a, 0*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 1*GB, clusterResource);
|
||||
|
@ -620,13 +635,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
// also, B gets a scheduling opportunity since A allocates RACK_LOCAL
|
||||
stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
|
||||
stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
|
||||
root.assignContainers(clusterResource, node_1, false,
|
||||
root.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
InOrder allocationOrder = inOrder(a, b);
|
||||
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
verifyQueueMetrics(a, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||
|
||||
|
@ -635,13 +650,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
// However, since B returns off-switch, A won't get an opportunity
|
||||
stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
|
||||
stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
|
||||
root.assignContainers(clusterResource, node_0, false,
|
||||
root.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
allocationOrder = inOrder(b, a);
|
||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
verifyQueueMetrics(a, 2*GB, clusterResource);
|
||||
verifyQueueMetrics(b, 4*GB, clusterResource);
|
||||
|
||||
|
@ -680,7 +695,7 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
// Simulate B3 returning a container on node_0
|
||||
stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
|
||||
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
|
||||
root.assignContainers(clusterResource, node_0, false,
|
||||
root.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
verifyQueueMetrics(b2, 0*GB, clusterResource);
|
||||
verifyQueueMetrics(b3, 1*GB, clusterResource);
|
||||
|
@ -689,13 +704,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
// also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL
|
||||
stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
|
||||
stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
|
||||
root.assignContainers(clusterResource, node_1, false,
|
||||
root.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
InOrder allocationOrder = inOrder(b2, b3);
|
||||
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
verifyQueueMetrics(b2, 1*GB, clusterResource);
|
||||
verifyQueueMetrics(b3, 2*GB, clusterResource);
|
||||
|
||||
|
@ -704,13 +719,13 @@ when(queue).assignContainers(eq(clusterResource), eq(node), eq(false),
|
|||
// However, since B3 returns off-switch, B2 won't get an opportunity
|
||||
stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
|
||||
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
|
||||
root.assignContainers(clusterResource, node_0, false,
|
||||
root.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
allocationOrder = inOrder(b3, b2);
|
||||
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
|
||||
any(FiCaSchedulerNode.class), anyBoolean(), anyResourceLimits());
|
||||
any(FiCaSchedulerNode.class), anyResourceLimits());
|
||||
verifyQueueMetrics(b2, 1*GB, clusterResource);
|
||||
verifyQueueMetrics(b3, 3*GB, clusterResource);
|
||||
|
||||
|
|
|
@ -265,7 +265,7 @@ public class TestReservations {
|
|||
|
||||
// Start testing...
|
||||
// Only AM
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -277,7 +277,7 @@ public class TestReservations {
|
|||
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||
|
||||
// Only 1 map - simulating reduce
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(5 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -289,7 +289,7 @@ public class TestReservations {
|
|||
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||
|
||||
// Only 1 map to other node - simulating reduce
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(8 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -304,7 +304,7 @@ public class TestReservations {
|
|||
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
|
||||
|
||||
// try to assign reducer (5G on node 0 and should reserve)
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(13 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -320,7 +320,7 @@ public class TestReservations {
|
|||
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
|
||||
|
||||
// assign reducer to node 2
|
||||
a.assignContainers(clusterResource, node_2, false,
|
||||
a.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(18 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -337,7 +337,7 @@ public class TestReservations {
|
|||
|
||||
// node_1 heartbeat and unreserves from node_0 in order to allocate
|
||||
// on node_1
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(18 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -421,7 +421,7 @@ public class TestReservations {
|
|||
|
||||
// Start testing...
|
||||
// Only AM
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -433,7 +433,7 @@ public class TestReservations {
|
|||
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||
|
||||
// Only 1 map - simulating reduce
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(5 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -445,7 +445,7 @@ public class TestReservations {
|
|||
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||
|
||||
// Only 1 map to other node - simulating reduce
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(8 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -460,7 +460,7 @@ public class TestReservations {
|
|||
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
|
||||
|
||||
// try to assign reducer (5G on node 0 and should reserve)
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(13 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -476,7 +476,7 @@ public class TestReservations {
|
|||
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
|
||||
|
||||
// assign reducer to node 2
|
||||
a.assignContainers(clusterResource, node_2, false,
|
||||
a.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(18 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -493,7 +493,7 @@ public class TestReservations {
|
|||
|
||||
// node_1 heartbeat and won't unreserve from node_0, potentially stuck
|
||||
// if AM doesn't handle
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(18 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -569,7 +569,7 @@ public class TestReservations {
|
|||
|
||||
// Start testing...
|
||||
// Only AM
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -580,7 +580,7 @@ public class TestReservations {
|
|||
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
|
||||
|
||||
// Only 1 map - simulating reduce
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(5 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -591,7 +591,7 @@ public class TestReservations {
|
|||
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
|
||||
|
||||
// Only 1 map to other node - simulating reduce
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(8 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -605,7 +605,7 @@ public class TestReservations {
|
|||
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
|
||||
|
||||
// try to assign reducer (5G on node 0 and should reserve)
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(13 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -620,7 +620,7 @@ public class TestReservations {
|
|||
assertEquals(2, app_0.getTotalRequiredResources(priorityReduce));
|
||||
|
||||
// could allocate but told need to unreserve first
|
||||
a.assignContainers(clusterResource, node_1, true,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(13 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -652,6 +652,8 @@ public class TestReservations {
|
|||
String host_1 = "host_1";
|
||||
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
|
||||
8 * GB);
|
||||
|
||||
Resource clusterResource = Resources.createResource(2 * 8 * GB);
|
||||
|
||||
// Setup resource-requests
|
||||
Priority priorityMap = TestUtils.createMockPriority(5);
|
||||
|
@ -681,23 +683,28 @@ public class TestReservations {
|
|||
node_0.getNodeID(), "user", rmContext);
|
||||
|
||||
// no reserved containers
|
||||
NodeId unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
|
||||
NodeId unreserveId =
|
||||
app_0.getNodeIdToUnreserve(priorityMap, capability,
|
||||
cs.getResourceCalculator(), clusterResource);
|
||||
assertEquals(null, unreserveId);
|
||||
|
||||
// no reserved containers - reserve then unreserve
|
||||
app_0.reserve(node_0, priorityMap, rmContainer_1, container_1);
|
||||
app_0.unreserve(node_0, priorityMap);
|
||||
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
|
||||
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
|
||||
cs.getResourceCalculator(), clusterResource);
|
||||
assertEquals(null, unreserveId);
|
||||
|
||||
// no container large enough is reserved
|
||||
app_0.reserve(node_0, priorityMap, rmContainer_1, container_1);
|
||||
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
|
||||
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
|
||||
cs.getResourceCalculator(), clusterResource);
|
||||
assertEquals(null, unreserveId);
|
||||
|
||||
// reserve one that is now large enough
|
||||
app_0.reserve(node_1, priorityMap, rmContainer, container);
|
||||
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability);
|
||||
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
|
||||
cs.getResourceCalculator(), clusterResource);
|
||||
assertEquals(node_1.getNodeID(), unreserveId);
|
||||
}
|
||||
|
||||
|
@ -741,14 +748,14 @@ public class TestReservations {
|
|||
|
||||
// nothing reserved
|
||||
boolean res = a.findNodeToUnreserve(csContext.getClusterResource(),
|
||||
node_1, app_0, priorityMap, capability);
|
||||
node_1, app_0, priorityMap, capability, capability);
|
||||
assertFalse(res);
|
||||
|
||||
// reserved but scheduler doesn't know about that node.
|
||||
app_0.reserve(node_1, priorityMap, rmContainer, container);
|
||||
node_1.reserveResource(app_0, priorityMap, rmContainer);
|
||||
res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0,
|
||||
priorityMap, capability);
|
||||
priorityMap, capability, capability);
|
||||
assertFalse(res);
|
||||
}
|
||||
|
||||
|
@ -815,7 +822,7 @@ public class TestReservations {
|
|||
|
||||
// Start testing...
|
||||
// Only AM
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -826,7 +833,7 @@ public class TestReservations {
|
|||
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
|
||||
|
||||
// Only 1 map - simulating reduce
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(5 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -837,7 +844,7 @@ public class TestReservations {
|
|||
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
|
||||
|
||||
// Only 1 map to other node - simulating reduce
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(8 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -852,14 +859,15 @@ public class TestReservations {
|
|||
// absoluteMaxCapacity
|
||||
Resource capability = Resources.createResource(32 * GB, 0);
|
||||
boolean res =
|
||||
a.canAssignToThisQueue(clusterResource, capability,
|
||||
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
|
||||
a.canAssignToThisQueue(clusterResource,
|
||||
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
|
||||
clusterResource), capability, Resources.none());
|
||||
assertFalse(res);
|
||||
|
||||
// now add in reservations and make sure it continues if config set
|
||||
// allocate to queue so that the potential new capacity is greater then
|
||||
// absoluteMaxCapacity
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(13 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -872,14 +880,17 @@ public class TestReservations {
|
|||
|
||||
capability = Resources.createResource(5 * GB, 0);
|
||||
res =
|
||||
a.canAssignToThisQueue(clusterResource, capability,
|
||||
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
|
||||
a.canAssignToThisQueue(clusterResource,
|
||||
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
|
||||
clusterResource), capability, Resources
|
||||
.createResource(5 * GB));
|
||||
assertTrue(res);
|
||||
|
||||
// tell to not check reservations
|
||||
res =
|
||||
a.canAssignToThisQueue(clusterResource, capability,
|
||||
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, false);
|
||||
a.canAssignToThisQueue(clusterResource,
|
||||
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
|
||||
clusterResource), capability, Resources.none());
|
||||
assertFalse(res);
|
||||
|
||||
refreshQueuesTurnOffReservationsContLook(a, csConf);
|
||||
|
@ -887,13 +898,16 @@ public class TestReservations {
|
|||
// should return false no matter what checkReservations is passed
|
||||
// in since feature is off
|
||||
res =
|
||||
a.canAssignToThisQueue(clusterResource, capability,
|
||||
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, false);
|
||||
a.canAssignToThisQueue(clusterResource,
|
||||
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
|
||||
clusterResource), capability, Resources.none());
|
||||
assertFalse(res);
|
||||
|
||||
res =
|
||||
a.canAssignToThisQueue(clusterResource, capability,
|
||||
CommonNodeLabelsManager.EMPTY_STRING_SET, app_0, true);
|
||||
a.canAssignToThisQueue(clusterResource,
|
||||
CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
|
||||
clusterResource), capability, Resources
|
||||
.createResource(5 * GB));
|
||||
assertFalse(res);
|
||||
}
|
||||
|
||||
|
@ -984,16 +998,16 @@ public class TestReservations {
|
|||
app_0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
|
||||
priorityAM, recordFactory)));
|
||||
app_0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true,
|
||||
priorityReduce, recordFactory)));
|
||||
app_0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true,
|
||||
priorityMap, recordFactory)));
|
||||
app_0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true,
|
||||
priorityReduce, recordFactory)));
|
||||
|
||||
// Start testing...
|
||||
// Only AM
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1004,7 +1018,7 @@ public class TestReservations {
|
|||
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
|
||||
|
||||
// Only 1 map - simulating reduce
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(5 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1015,7 +1029,7 @@ public class TestReservations {
|
|||
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
|
||||
|
||||
// Only 1 map to other node - simulating reduce
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(8 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1029,7 +1043,7 @@ public class TestReservations {
|
|||
// now add in reservations and make sure it continues if config set
|
||||
// allocate to queue so that the potential new capacity is greater then
|
||||
// absoluteMaxCapacity
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(13 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1116,19 +1130,19 @@ public class TestReservations {
|
|||
app_0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
|
||||
priorityAM, recordFactory)));
|
||||
app_0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true,
|
||||
priorityReduce, recordFactory)));
|
||||
app_0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true,
|
||||
priorityMap, recordFactory)));
|
||||
app_0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true,
|
||||
priorityReduce, recordFactory)));
|
||||
app_0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||
.createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true,
|
||||
priorityLast, recordFactory)));
|
||||
|
||||
// Start testing...
|
||||
// Only AM
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(2 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1140,7 +1154,7 @@ public class TestReservations {
|
|||
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||
|
||||
// Only 1 map - simulating reduce
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(5 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1152,7 +1166,7 @@ public class TestReservations {
|
|||
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||
|
||||
// Only 1 map to other node - simulating reduce
|
||||
a.assignContainers(clusterResource, node_1, false,
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(8 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1164,38 +1178,41 @@ public class TestReservations {
|
|||
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
|
||||
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||
|
||||
// try to assign reducer (5G on node 0), but tell it
|
||||
// it has to unreserve. No room to allocate and shouldn't reserve
|
||||
// since nothing currently reserved.
|
||||
a.assignContainers(clusterResource, node_0, true,
|
||||
new ResourceLimits(clusterResource));
|
||||
// try to assign reducer (5G on node 0), but tell it's resource limits <
|
||||
// used (8G) + required (5G). It will not reserved since it has to unreserve
|
||||
// some resource. Even with continous reservation looking, we don't allow
|
||||
// unreserve resource to reserve container.
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(Resources.createResource(10 * GB)));
|
||||
assertEquals(8 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
||||
assertEquals(8 * GB, a.getMetrics().getAllocatedMB());
|
||||
assertEquals(16 * GB, a.getMetrics().getAvailableMB());
|
||||
assertEquals(16 * GB, app_0.getHeadroom().getMemory());
|
||||
// app_0's headroom = limit (10G) - used (8G) = 2G
|
||||
assertEquals(2 * GB, app_0.getHeadroom().getMemory());
|
||||
assertEquals(5 * GB, node_0.getUsedResource().getMemory());
|
||||
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
|
||||
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||
|
||||
// try to assign reducer (5G on node 2), but tell it
|
||||
// it has to unreserve. Has room but shouldn't reserve
|
||||
// since nothing currently reserved.
|
||||
a.assignContainers(clusterResource, node_2, true,
|
||||
new ResourceLimits(clusterResource));
|
||||
// try to assign reducer (5G on node 0), but tell it's resource limits <
|
||||
// used (8G) + required (5G). It will not reserved since it has to unreserve
|
||||
// some resource. Unfortunately, there's nothing to unreserve.
|
||||
a.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(Resources.createResource(10 * GB)));
|
||||
assertEquals(8 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
||||
assertEquals(8 * GB, a.getMetrics().getAllocatedMB());
|
||||
assertEquals(16 * GB, a.getMetrics().getAvailableMB());
|
||||
assertEquals(16 * GB, app_0.getHeadroom().getMemory());
|
||||
// app_0's headroom = limit (10G) - used (8G) = 2G
|
||||
assertEquals(2 * GB, app_0.getHeadroom().getMemory());
|
||||
assertEquals(5 * GB, node_0.getUsedResource().getMemory());
|
||||
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
|
||||
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||
|
||||
// let it assign 5G to node_2
|
||||
a.assignContainers(clusterResource, node_2, false,
|
||||
a.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(13 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1208,7 +1225,7 @@ public class TestReservations {
|
|||
assertEquals(5 * GB, node_2.getUsedResource().getMemory());
|
||||
|
||||
// reserve 8G node_0
|
||||
a.assignContainers(clusterResource, node_0, false,
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(21 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
@ -1223,7 +1240,7 @@ public class TestReservations {
|
|||
// try to assign (8G on node 2). No room to allocate,
|
||||
// continued to try due to having reservation above,
|
||||
// but hits queue limits so can't reserve anymore.
|
||||
a.assignContainers(clusterResource, node_2, false,
|
||||
a.assignContainers(clusterResource, node_2,
|
||||
new ResourceLimits(clusterResource));
|
||||
assertEquals(21 * GB, a.getUsedResources().getMemory());
|
||||
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
|
||||
|
|
Loading…
Reference in New Issue