YARN-3434. Interaction between reservations and userlimit can result in significant ULF violation

This commit is contained in:
tgraves 2015-04-23 14:39:25 +00:00
parent baf8bc6c48
commit 189a63a719
5 changed files with 186 additions and 166 deletions

View File

@ -252,6 +252,9 @@ Release 2.8.0 - UNRELEASED
YARN-3495. Confusing log generated by FairScheduler. YARN-3495. Confusing log generated by FairScheduler.
(Brahma Reddy Battula via ozawa) (Brahma Reddy Battula via ozawa)
YARN-3434. Interaction between reservations and userlimit can result in
significant ULF violation (tgraves)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -19,22 +19,44 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
/** /**
* Resource limits for queues/applications, this means max overall (please note * Resource limits for queues/applications, this means max overall (please note
* that, it's not "extra") resource you can get. * that, it's not "extra") resource you can get.
*/ */
public class ResourceLimits { public class ResourceLimits {
volatile Resource limit;
// This is special limit that goes with the RESERVE_CONT_LOOK_ALL_NODES
// config. This limit indicates how much we need to unreserve to allocate
// another container.
private volatile Resource amountNeededUnreserve;
public ResourceLimits(Resource limit) { public ResourceLimits(Resource limit) {
this.amountNeededUnreserve = Resources.none();
this.limit = limit; this.limit = limit;
} }
volatile Resource limit; public ResourceLimits(Resource limit, Resource amountNeededUnreserve) {
this.amountNeededUnreserve = amountNeededUnreserve;
this.limit = limit;
}
public Resource getLimit() { public Resource getLimit() {
return limit; return limit;
} }
public Resource getAmountNeededUnreserve() {
return amountNeededUnreserve;
}
public void setLimit(Resource limit) { public void setLimit(Resource limit) {
this.limit = limit; this.limit = limit;
} }
public void setAmountNeededUnreserve(Resource amountNeededUnreserve) {
this.amountNeededUnreserve = amountNeededUnreserve;
}
} }

View File

@ -85,7 +85,7 @@ public abstract class AbstractCSQueue implements CSQueue {
// Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity, // Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity,
// etc. // etc.
QueueCapacities queueCapacities; QueueCapacities queueCapacities;
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
protected CapacitySchedulerContext csContext; protected CapacitySchedulerContext csContext;
@ -473,55 +473,55 @@ public abstract class AbstractCSQueue implements CSQueue {
getCurrentLimitResource(nodePartition, clusterResource, getCurrentLimitResource(nodePartition, clusterResource,
currentResourceLimits, schedulingMode); currentResourceLimits, schedulingMode);
// if reservation continous looking enabled, check to see if could we
// potentially use this node instead of a reserved node if the application
// has reserved containers.
// TODO, now only consider reservation cases when the node has no label
if (this.reservationsContinueLooking
&& nodePartition.equals(RMNodeLabelsManager.NO_LABEL)
&& Resources.greaterThan(resourceCalculator, clusterResource,
resourceCouldBeUnreserved, Resources.none())) {
// resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource =
Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
// when total-used-without-reserved-resource < currentLimit, we still
// have chance to allocate on this node by unreserving some containers
if (Resources.lessThan(resourceCalculator, clusterResource,
newTotalWithoutReservedResource, currentLimitResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: " + getQueueName()
+ " usedResources: " + queueUsage.getUsed()
+ ", clusterResources: " + clusterResource
+ ", reservedResources: " + resourceCouldBeUnreserved
+ ", capacity-without-reserved: "
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ currentLimitResource);
}
return true;
}
}
// Check if we over current-resource-limit computed.
if (Resources.greaterThan(resourceCalculator, clusterResource, if (Resources.greaterThan(resourceCalculator, clusterResource,
newTotalResource, currentLimitResource)) { newTotalResource, currentLimitResource)) {
return false;
}
if (LOG.isDebugEnabled()) { // if reservation continous looking enabled, check to see if could we
LOG.debug(getQueueName() // potentially use this node instead of a reserved node if the application
+ "Check assign to queue, nodePartition=" // has reserved containers.
+ nodePartition // TODO, now only consider reservation cases when the node has no label
+ " usedResources: " if (this.reservationsContinueLooking
+ queueUsage.getUsed(nodePartition) && nodePartition.equals(RMNodeLabelsManager.NO_LABEL)
+ " clusterResources: " && Resources.greaterThan(resourceCalculator, clusterResource,
+ clusterResource resourceCouldBeUnreserved, Resources.none())) {
+ " currentUsedCapacity " // resource-without-reserved = used - reserved
+ Resources.divide(resourceCalculator, clusterResource, Resource newTotalWithoutReservedResource =
queueUsage.getUsed(nodePartition), Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
labelManager.getResourceByLabel(nodePartition, clusterResource))
+ " max-capacity: " // when total-used-without-reserved-resource < currentLimit, we still
+ queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")"); // have chance to allocate on this node by unreserving some containers
if (Resources.lessThan(resourceCalculator, clusterResource,
newTotalWithoutReservedResource, currentLimitResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: " + getQueueName()
+ " usedResources: " + queueUsage.getUsed()
+ ", clusterResources: " + clusterResource
+ ", reservedResources: " + resourceCouldBeUnreserved
+ ", capacity-without-reserved: "
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ currentLimitResource);
}
currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource,
currentLimitResource));
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName()
+ "Check assign to queue, nodePartition="
+ nodePartition
+ " usedResources: "
+ queueUsage.getUsed(nodePartition)
+ " clusterResources: "
+ clusterResource
+ " currentUsedCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(nodePartition),
labelManager.getResourceByLabel(nodePartition, clusterResource))
+ " max-capacity: "
+ queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")");
}
return false;
} }
return true; return true;
} }

View File

@ -119,8 +119,8 @@ public class LeafQueue extends AbstractCSQueue {
private final QueueResourceLimitsInfo queueResourceLimitsInfo = private final QueueResourceLimitsInfo queueResourceLimitsInfo =
new QueueResourceLimitsInfo(); new QueueResourceLimitsInfo();
private volatile ResourceLimits currentResourceLimits = null; private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
private OrderingPolicy<FiCaSchedulerApp> private OrderingPolicy<FiCaSchedulerApp>
orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>(); orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>();
@ -151,7 +151,7 @@ public class LeafQueue extends AbstractCSQueue {
this.lastClusterResource = clusterResource; this.lastClusterResource = clusterResource;
updateAbsoluteCapacityResource(clusterResource); updateAbsoluteCapacityResource(clusterResource);
this.currentResourceLimits = new ResourceLimits(clusterResource); this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource);
// Initialize headroom info, also used for calculating application // Initialize headroom info, also used for calculating application
// master resource limits. Since this happens during queue initialization // master resource limits. Since this happens during queue initialization
@ -715,14 +715,14 @@ public class LeafQueue extends AbstractCSQueue {
activateApplications(); activateApplications();
LOG.info("Application removed -" + LOG.info("Application removed -" +
" appId: " + application.getApplicationId() + " appId: " + application.getApplicationId() +
" user: " + application.getUser() + " user: " + application.getUser() +
" queue: " + getQueueName() + " queue: " + getQueueName() +
" #user-pending-applications: " + user.getPendingApplications() + " #user-pending-applications: " + user.getPendingApplications() +
" #user-active-applications: " + user.getActiveApplications() + " #user-active-applications: " + user.getActiveApplications() +
" #queue-pending-applications: " + getNumPendingApplications() + " #queue-pending-applications: " + getNumPendingApplications() +
" #queue-active-applications: " + getNumActiveApplications() " #queue-active-applications: " + getNumActiveApplications()
); );
} }
private synchronized FiCaSchedulerApp getApplication( private synchronized FiCaSchedulerApp getApplication(
@ -854,18 +854,18 @@ public class LeafQueue extends AbstractCSQueue {
// before all higher priority ones are serviced. // before all higher priority ones are serviced.
Resource userLimit = Resource userLimit =
computeUserLimitAndSetHeadroom(application, clusterResource, computeUserLimitAndSetHeadroom(application, clusterResource,
required, node.getPartition(), schedulingMode); required, node.getPartition(), schedulingMode);
// Check queue max-capacity limit // Check queue max-capacity limit
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
this.currentResourceLimits, required, currentResourceLimits, required,
application.getCurrentReservation(), schedulingMode)) { application.getCurrentReservation(), schedulingMode)) {
return NULL_ASSIGNMENT; return NULL_ASSIGNMENT;
} }
// Check user limit // Check user limit
if (!canAssignToUser(clusterResource, application.getUser(), userLimit, if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
application, true, node.getPartition())) { application, node.getPartition(), currentResourceLimits)) {
break; break;
} }
@ -906,9 +906,9 @@ public class LeafQueue extends AbstractCSQueue {
} }
// Try to schedule // Try to schedule
CSAssignment assignment = CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority, assignContainersOnNode(clusterResource, node, application, priority,
null, schedulingMode); null, schedulingMode, currentResourceLimits);
// Did the application skip this node? // Did the application skip this node?
if (assignment.getSkipped()) { if (assignment.getSkipped()) {
@ -975,7 +975,7 @@ public class LeafQueue extends AbstractCSQueue {
// Try to assign if we have sufficient resources // Try to assign if we have sufficient resources
CSAssignment tmp = CSAssignment tmp =
assignContainersOnNode(clusterResource, node, application, priority, assignContainersOnNode(clusterResource, node, application, priority,
rmContainer, schedulingMode); rmContainer, schedulingMode, new ResourceLimits(Resources.none()));
// Doesn't matter... since it's already charged for at time of reservation // Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free* // "re-reservation" is *free*
@ -1026,7 +1026,7 @@ public class LeafQueue extends AbstractCSQueue {
private void setQueueResourceLimitsInfo( private void setQueueResourceLimitsInfo(
Resource clusterResource) { Resource clusterResource) {
synchronized (queueResourceLimitsInfo) { synchronized (queueResourceLimitsInfo) {
queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom
.getLimit()); .getLimit());
queueResourceLimitsInfo.setClusterResource(clusterResource); queueResourceLimitsInfo.setClusterResource(clusterResource);
} }
@ -1048,13 +1048,13 @@ public class LeafQueue extends AbstractCSQueue {
setQueueResourceLimitsInfo(clusterResource); setQueueResourceLimitsInfo(clusterResource);
Resource headroom = Resource headroom =
getHeadroom(queueUser, currentResourceLimits.getLimit(), getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
clusterResource, userLimit); clusterResource, userLimit);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " + LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit + " userLimit=" + userLimit +
" queueMaxAvailRes=" + currentResourceLimits.getLimit() + " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() +
" consumed=" + queueUser.getUsed() + " consumed=" + queueUser.getUsed() +
" headroom=" + headroom); " headroom=" + headroom);
} }
@ -1169,7 +1169,7 @@ public class LeafQueue extends AbstractCSQueue {
@Private @Private
protected synchronized boolean canAssignToUser(Resource clusterResource, protected synchronized boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application, String userName, Resource limit, FiCaSchedulerApp application,
boolean checkReservations, String nodePartition) { String nodePartition, ResourceLimits currentResoureLimits) {
User user = getUser(userName); User user = getUser(userName);
// Note: We aren't considering the current request since there is a fixed // Note: We aren't considering the current request since there is a fixed
@ -1180,13 +1180,13 @@ public class LeafQueue extends AbstractCSQueue {
limit)) { limit)) {
// if enabled, check to see if could we potentially use this node instead // if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers // of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking && checkReservations if (this.reservationsContinueLooking &&
&& nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) { nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
if (Resources.lessThanOrEqual( if (Resources.lessThanOrEqual(
resourceCalculator, resourceCalculator,
clusterResource, clusterResource,
Resources.subtract(user.getUsed(), Resources.subtract(user.getUsed(),application.getCurrentReservation()),
application.getCurrentReservation()), limit)) { limit)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName() LOG.debug("User " + userName + " in queue " + getQueueName()
@ -1194,6 +1194,13 @@ public class LeafQueue extends AbstractCSQueue {
+ user.getUsed() + " reserved: " + user.getUsed() + " reserved: "
+ application.getCurrentReservation() + " limit: " + limit); + application.getCurrentReservation() + " limit: " + limit);
} }
Resource amountNeededToUnreserve = Resources.subtract(user.getUsed(nodePartition), limit);
// we can only acquire a new container if we unreserve first since we ignored the
// user limit. Choose the max of user limit or what was previously set by max
// capacity.
currentResoureLimits.setAmountNeededUnreserve(
Resources.max(resourceCalculator, clusterResource,
currentResoureLimits.getAmountNeededUnreserve(), amountNeededToUnreserve));
return true; return true;
} }
} }
@ -1240,7 +1247,8 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment assignContainersOnNode(Resource clusterResource, private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, SchedulingMode schedulingMode) { RMContainer reservedContainer, SchedulingMode schedulingMode,
ResourceLimits currentResoureLimits) {
CSAssignment assigned; CSAssignment assigned;
@ -1254,7 +1262,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned = assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer, node, application, priority, reservedContainer,
allocatedContainer, schedulingMode); allocatedContainer, schedulingMode, currentResoureLimits);
if (Resources.greaterThan(resourceCalculator, clusterResource, if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned.getResource(), Resources.none())) { assigned.getResource(), Resources.none())) {
@ -1283,7 +1291,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned = assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest, assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer, node, application, priority, reservedContainer,
allocatedContainer, schedulingMode); allocatedContainer, schedulingMode, currentResoureLimits);
if (Resources.greaterThan(resourceCalculator, clusterResource, if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned.getResource(), Resources.none())) { assigned.getResource(), Resources.none())) {
@ -1312,7 +1320,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned = assigned =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer, node, application, priority, reservedContainer,
allocatedContainer, schedulingMode); allocatedContainer, schedulingMode, currentResoureLimits);
// update locality statistics // update locality statistics
if (allocatedContainer.getValue() != null) { if (allocatedContainer.getValue() != null) {
@ -1324,19 +1332,11 @@ public class LeafQueue extends AbstractCSQueue {
return SKIP_ASSIGNMENT; return SKIP_ASSIGNMENT;
} }
private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
// First we need to get minimum resource we need unreserve
// minimum-resource-need-unreserve = used + asked - limit
return Resources.subtract(
Resources.add(queueUsage.getUsed(), askedResource),
currentResourceLimits.getLimit());
}
@Private @Private
protected boolean findNodeToUnreserve(Resource clusterResource, protected boolean findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
Resource askedResource, Resource minimumUnreservedResource) { Resource minimumUnreservedResource) {
// need to unreserve some other container first // need to unreserve some other container first
NodeId idToUnreserve = NodeId idToUnreserve =
application.getNodeIdToUnreserve(priority, minimumUnreservedResource, application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
@ -1357,7 +1357,7 @@ public class LeafQueue extends AbstractCSQueue {
LOG.debug("unreserving for app: " + application.getApplicationId() LOG.debug("unreserving for app: " + application.getApplicationId()
+ " on nodeId: " + idToUnreserve + " on nodeId: " + idToUnreserve
+ " in order to replace reserved application and place it on node: " + " in order to replace reserved application and place it on node: "
+ node.getNodeID() + " needing: " + askedResource); + node.getNodeID() + " needing: " + minimumUnreservedResource);
} }
// headroom // headroom
@ -1376,47 +1376,16 @@ public class LeafQueue extends AbstractCSQueue {
return true; return true;
} }
@Private
protected boolean checkLimitsToReserve(Resource clusterResource,
FiCaSchedulerApp application, Resource capability, String nodePartition,
SchedulingMode schedulingMode) {
// we can't reserve if we got here based on the limit
// checks assuming we could unreserve!!!
Resource userLimit = computeUserLimitAndSetHeadroom(application,
clusterResource, capability, nodePartition, schedulingMode);
// Check queue max-capacity limit,
// TODO: Consider reservation on labels
if (!canAssignToThisQueue(clusterResource, RMNodeLabelsManager.NO_LABEL,
this.currentResourceLimits, capability, Resources.none(), schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit queue limit");
}
return false;
}
// Check user limit
if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
application, false, nodePartition)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit user limit");
}
return false;
}
return true;
}
private CSAssignment assignNodeLocalContainers(Resource clusterResource, private CSAssignment assignNodeLocalContainers(Resource clusterResource,
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer, RMContainer reservedContainer, MutableObject allocatedContainer,
SchedulingMode schedulingMode) { SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
if (canAssign(application, priority, node, NodeType.NODE_LOCAL, if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
reservedContainer)) { reservedContainer)) {
return assignContainer(clusterResource, node, application, priority, return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
allocatedContainer, schedulingMode); allocatedContainer, schedulingMode, currentResoureLimits);
} }
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
@ -1426,12 +1395,12 @@ public class LeafQueue extends AbstractCSQueue {
ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer, RMContainer reservedContainer, MutableObject allocatedContainer,
SchedulingMode schedulingMode) { SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL, if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
reservedContainer)) { reservedContainer)) {
return assignContainer(clusterResource, node, application, priority, return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
allocatedContainer, schedulingMode); allocatedContainer, schedulingMode, currentResoureLimits);
} }
return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL); return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
@ -1441,12 +1410,12 @@ public class LeafQueue extends AbstractCSQueue {
ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer, RMContainer reservedContainer, MutableObject allocatedContainer,
SchedulingMode schedulingMode) { SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH, if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) { reservedContainer)) {
return assignContainer(clusterResource, node, application, priority, return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
allocatedContainer, schedulingMode); allocatedContainer, schedulingMode, currentResoureLimits);
} }
return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH); return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
@ -1529,7 +1498,8 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node, private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority, FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer, ResourceRequest request, NodeType type, RMContainer rmContainer,
MutableObject createdContainer, SchedulingMode schedulingMode) { MutableObject createdContainer, SchedulingMode schedulingMode,
ResourceLimits currentResoureLimits) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName() LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId() + " application=" + application.getApplicationId()
@ -1573,13 +1543,17 @@ public class LeafQueue extends AbstractCSQueue {
LOG.warn("Couldn't get container for allocation!"); LOG.warn("Couldn't get container for allocation!");
return new CSAssignment(Resources.none(), type); return new CSAssignment(Resources.none(), type);
} }
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
application, priority, capability); application, priority, capability);
// Can we allocate a container on this node? // Can we allocate a container on this node?
int availableContainers = int availableContainers =
resourceCalculator.computeAvailableContainers(available, capability); resourceCalculator.computeAvailableContainers(available, capability);
boolean needToUnreserve = Resources.greaterThan(resourceCalculator,clusterResource,
currentResoureLimits.getAmountNeededUnreserve(), Resources.none());
if (availableContainers > 0) { if (availableContainers > 0) {
// Allocate... // Allocate...
@ -1588,20 +1562,24 @@ public class LeafQueue extends AbstractCSQueue {
unreserve(application, priority, node, rmContainer); unreserve(application, priority, node, rmContainer);
} else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) { } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
// when reservationsContinueLooking is set, we may need to unreserve // when reservationsContinueLooking is set, we may need to unreserve
// some containers to meet this queue and its parents' resource limits // some containers to meet this queue, its parents', or the users' resource limits.
// TODO, need change here when we want to support continuous reservation // TODO, need change here when we want to support continuous reservation
// looking for labeled partitions. // looking for labeled partitions.
Resource minimumUnreservedResource = if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
getMinimumResourceNeedUnreserved(capability); // If we shouldn't allocate/reserve new container then we should unreserve one the same
if (!shouldAllocOrReserveNewContainer // size we are asking for since the currentResoureLimits.getAmountNeededUnreserve
|| Resources.greaterThan(resourceCalculator, clusterResource, // could be zero. If the limit was hit then use the amount we need to unreserve to be
minimumUnreservedResource, Resources.none())) { // under the limit.
Resource amountToUnreserve = capability;
if (needToUnreserve) {
amountToUnreserve = currentResoureLimits.getAmountNeededUnreserve();
}
boolean containerUnreserved = boolean containerUnreserved =
findNodeToUnreserve(clusterResource, node, application, priority, findNodeToUnreserve(clusterResource, node, application, priority,
capability, minimumUnreservedResource); amountToUnreserve);
// When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
// container (That means we *have to* unreserve some resource to // container (That means we *have to* unreserve some resource to
// continue)). If we failed to unreserve some resource, // continue)). If we failed to unreserve some resource, we can't continue.
if (!containerUnreserved) { if (!containerUnreserved) {
return new CSAssignment(Resources.none(), type); return new CSAssignment(Resources.none(), type);
} }
@ -1642,13 +1620,13 @@ public class LeafQueue extends AbstractCSQueue {
if (shouldAllocOrReserveNewContainer || rmContainer != null) { if (shouldAllocOrReserveNewContainer || rmContainer != null) {
if (reservationsContinueLooking && rmContainer == null) { if (reservationsContinueLooking && rmContainer == null) {
// we could possibly ignoring parent queue capacity limits when // we could possibly ignoring queue capacity or user limits when
// reservationsContinueLooking is set. // reservationsContinueLooking is set. Make sure we didn't need to unreserve
// If we're trying to reserve a container here, not container will be // one.
// unreserved for reserving the new one. Check limits again before if (needToUnreserve) {
// reserve the new container if (LOG.isDebugEnabled()) {
if (!checkLimitsToReserve(clusterResource, LOG.debug("we needed to unreserve to be able to allocate");
application, capability, node.getPartition(), schedulingMode)) { }
return new CSAssignment(Resources.none(), type); return new CSAssignment(Resources.none(), type);
} }
} }
@ -1811,14 +1789,14 @@ public class LeafQueue extends AbstractCSQueue {
// Even if ParentQueue will set limits respect child's max queue capacity, // Even if ParentQueue will set limits respect child's max queue capacity,
// but when allocating reserved container, CapacityScheduler doesn't do // but when allocating reserved container, CapacityScheduler doesn't do
// this. So need cap limits by queue's max capacity here. // this. So need cap limits by queue's max capacity here.
this.currentResourceLimits = currentResourceLimits; this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit());
Resource queueMaxResource = Resource queueMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
queueCapacities queueCapacities
.getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL), .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
minimumAllocation); minimumAllocation);
this.currentResourceLimits.setLimit(Resources.min(resourceCalculator, this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(resourceCalculator,
clusterResource, queueMaxResource, currentResourceLimits.getLimit())); clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
} }

View File

@ -748,14 +748,14 @@ public class TestReservations {
// nothing reserved // nothing reserved
boolean res = a.findNodeToUnreserve(csContext.getClusterResource(), boolean res = a.findNodeToUnreserve(csContext.getClusterResource(),
node_1, app_0, priorityMap, capability, capability); node_1, app_0, priorityMap, capability);
assertFalse(res); assertFalse(res);
// reserved but scheduler doesn't know about that node. // reserved but scheduler doesn't know about that node.
app_0.reserve(node_1, priorityMap, rmContainer, container); app_0.reserve(node_1, priorityMap, rmContainer, container);
node_1.reserveResource(app_0, priorityMap, rmContainer); node_1.reserveResource(app_0, priorityMap, rmContainer);
res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0, res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0,
priorityMap, capability, capability); priorityMap, capability);
assertFalse(res); assertFalse(res);
} }
@ -858,12 +858,13 @@ public class TestReservations {
// allocate to queue so that the potential new capacity is greater then // allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity // absoluteMaxCapacity
Resource capability = Resources.createResource(32 * GB, 0); Resource capability = Resources.createResource(32 * GB, 0);
ResourceLimits limits = new ResourceLimits(clusterResource);
boolean res = boolean res =
a.canAssignToThisQueue(clusterResource, a.canAssignToThisQueue(clusterResource,
RMNodeLabelsManager.NO_LABEL, new ResourceLimits( RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.none(),
clusterResource), capability, Resources.none(),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res); assertFalse(res);
assertEquals(limits.getAmountNeededUnreserve(), Resources.none());
// now add in reservations and make sure it continues if config set // now add in reservations and make sure it continues if config set
// allocate to queue so that the potential new capacity is greater then // allocate to queue so that the potential new capacity is greater then
@ -880,38 +881,43 @@ public class TestReservations {
assertEquals(3 * GB, node_1.getUsedResource().getMemory()); assertEquals(3 * GB, node_1.getUsedResource().getMemory());
capability = Resources.createResource(5 * GB, 0); capability = Resources.createResource(5 * GB, 0);
limits = new ResourceLimits(clusterResource);
res = res =
a.canAssignToThisQueue(clusterResource, a.canAssignToThisQueue(clusterResource,
RMNodeLabelsManager.NO_LABEL, new ResourceLimits( RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.createResource(5 * GB),
clusterResource), capability, Resources.createResource(5 * GB),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertTrue(res); assertTrue(res);
// 16GB total, 13GB consumed (8 allocated, 5 reserved). asking for 5GB so we would have to
// unreserve 2GB to get the total 5GB needed.
// also note vcore checks not enabled
assertEquals(Resources.createResource(2 * GB, 3), limits.getAmountNeededUnreserve());
// tell to not check reservations // tell to not check reservations
limits = new ResourceLimits(clusterResource);
res = res =
a.canAssignToThisQueue(clusterResource, a.canAssignToThisQueue(clusterResource,
RMNodeLabelsManager.NO_LABEL, new ResourceLimits( RMNodeLabelsManager.NO_LABEL,limits, capability, Resources.none(),
clusterResource), capability, Resources.none(),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res); assertFalse(res);
assertEquals(Resources.none(), limits.getAmountNeededUnreserve());
refreshQueuesTurnOffReservationsContLook(a, csConf); refreshQueuesTurnOffReservationsContLook(a, csConf);
// should return false no matter what checkReservations is passed // should return false since reservations continue look is off.
// in since feature is off limits = new ResourceLimits(clusterResource);
res = res =
a.canAssignToThisQueue(clusterResource, a.canAssignToThisQueue(clusterResource,
RMNodeLabelsManager.NO_LABEL, new ResourceLimits( RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.none(),
clusterResource), capability, Resources.none(),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res); assertFalse(res);
assertEquals(limits.getAmountNeededUnreserve(), Resources.none());
limits = new ResourceLimits(clusterResource);
res = res =
a.canAssignToThisQueue(clusterResource, a.canAssignToThisQueue(clusterResource,
RMNodeLabelsManager.NO_LABEL, new ResourceLimits( RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.createResource(5 * GB),
clusterResource), capability, Resources.createResource(5 * GB),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res); assertFalse(res);
assertEquals(Resources.none(), limits.getAmountNeededUnreserve());
} }
public void refreshQueuesTurnOffReservationsContLook(LeafQueue a, public void refreshQueuesTurnOffReservationsContLook(LeafQueue a,
@ -1059,22 +1065,33 @@ public class TestReservations {
assertEquals(5 * GB, node_0.getUsedResource().getMemory()); assertEquals(5 * GB, node_0.getUsedResource().getMemory());
assertEquals(3 * GB, node_1.getUsedResource().getMemory()); assertEquals(3 * GB, node_1.getUsedResource().getMemory());
// set limit so subtrace reservations it can continue // not over the limit
Resource limit = Resources.createResource(12 * GB, 0); Resource limit = Resources.createResource(14 * GB, 0);
boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, ResourceLimits userResourceLimits = new ResourceLimits(clusterResource);
true, ""); boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, "", userResourceLimits);
assertTrue(res); assertTrue(res);
assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve());
// tell it not to check for reservations and should fail as already over
// limit // set limit so it subtracts reservations and it can continue
res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, ""); limit = Resources.createResource(12 * GB, 0);
assertFalse(res); userResourceLimits = new ResourceLimits(clusterResource);
res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
"", userResourceLimits);
assertTrue(res);
// limit set to 12GB, we are using 13GB (8 allocated, 5 reserved), to get under limit
// we need to unreserve 1GB
// also note vcore checks not enabled
assertEquals(Resources.createResource(1 * GB, 4),
userResourceLimits.getAmountNeededUnreserve());
refreshQueuesTurnOffReservationsContLook(a, csConf); refreshQueuesTurnOffReservationsContLook(a, csConf);
userResourceLimits = new ResourceLimits(clusterResource);
// should now return false since feature off // should now return false since feature off
res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, ""); res = a.canAssignToUser(clusterResource, user_0, limit, app_0, "", userResourceLimits);
assertFalse(res); assertFalse(res);
assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve());
} }
@Test @Test