YARN-1769. CapacityScheduler: Improve reservations. Contributed by Thomas Graves

This commit is contained in:
Jason Lowe 2014-09-29 14:12:18 +00:00
parent b38e52b5e8
commit 9c22065109
14 changed files with 1760 additions and 229 deletions

View File

@ -265,6 +265,9 @@ Release 2.6.0 - UNRELEASED
YARN-668. Changed NMTokenIdentifier/AMRMTokenIdentifier/ContainerTokenIdentifier
to use protobuf object as the payload. (Junping Du via jianhe)
YARN-1769. CapacityScheduler: Improve reservations (Thomas Graves via
jlowe)
OPTIMIZATIONS
BUG FIXES

View File

@ -351,4 +351,16 @@
<Class name="org.apache.hadoop.yarn.util.ApplicationClassLoader"/>
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
</Match>
<!-- It is only changed on re-initialization the warnings are for access from a test function. -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue" />
<Field name="reservationsContinueLooking" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue" />
<Field name="reservationsContinueLooking" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter>

View File

@ -184,10 +184,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* Assign containers to applications in the queue or it's children (if any).
* @param clusterResource the resource of the cluster.
* @param node node on which resources are available
* @param needToUnreserve assign container only if it can unreserve one first
* @return the assignment
*/
public CSAssignment assignContainers(
Resource clusterResource, FiCaSchedulerNode node);
Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve);
/**
* A container assigned to the queue has completed.
@ -200,11 +201,13 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* container
* @param childQueue <code>CSQueue</code> to reinsert in childQueues
* @param event event to be sent to the container
* @param sortQueues indicates whether it should re-sort the queues
*/
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node,
RMContainer container, ContainerStatus containerStatus,
RMContainerEventType event, CSQueue childQueue);
RMContainerEventType event, CSQueue childQueue,
boolean sortQueues);
/**
* Get the number of applications in the queue.

View File

@ -516,13 +516,13 @@ public class CapacityScheduler extends
"Queue configuration missing child queue names for " + queueName);
}
queue =
new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName));
new LeafQueue(csContext, queueName, parent, oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(queue);
} else {
ParentQueue parentQueue =
new ParentQueue(csContext, queueName, parent,oldQueues.get(queueName));
new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(parentQueue);
@ -922,7 +922,8 @@ public class CapacityScheduler extends
node.getNodeID());
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
CSAssignment assignment = queue.assignContainers(clusterResource, node);
CSAssignment assignment = queue.assignContainers(clusterResource, node,
false);
RMContainer excessReservation = assignment.getExcessReservation();
if (excessReservation != null) {
@ -933,7 +934,7 @@ public class CapacityScheduler extends
SchedulerUtils.createAbnormalContainerStatus(
container.getId(),
SchedulerUtils.UNRESERVED_CONTAINER),
RMContainerEventType.RELEASED, null);
RMContainerEventType.RELEASED, null, true);
}
}
@ -946,7 +947,7 @@ public class CapacityScheduler extends
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource());
}
root.assignContainers(clusterResource, node);
root.assignContainers(clusterResource, node, false);
}
} else {
LOG.info("Skipping scheduling since node " + node.getNodeID() +
@ -1122,7 +1123,7 @@ public class CapacityScheduler extends
// Inform the queue
LeafQueue queue = (LeafQueue)application.getQueue();
queue.completedContainer(clusterResource, application, node,
rmContainer, containerStatus, event, null);
rmContainer, containerStatus, event, null, true);
LOG.info("Application attempt " + application.getApplicationAttemptId()
+ " released container " + container.getId() + " on node: " + node
@ -1138,7 +1139,7 @@ public class CapacityScheduler extends
}
@Lock(Lock.NoLock.class)
FiCaSchedulerNode getNode(NodeId nodeId) {
public FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}

View File

@ -81,6 +81,13 @@ public class CapacitySchedulerConfiguration extends Configuration {
@Private
public static final String STATE = "state";
@Private
public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX
+ "reservations-continue-look-all-nodes";
@Private
public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
@Private
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
@ -308,6 +315,17 @@ public class CapacitySchedulerConfiguration extends Configuration {
QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING;
}
/*
* Returns whether we should continue to look at all heart beating nodes even
* after the reservation limit was hit. The node heart beating in could
* satisfy the request thus could be a better pick then waiting for the
* reservation to be fullfilled. This config is refreshable.
*/
public boolean getReservationContinueLook() {
return getBoolean(RESERVE_CONT_LOOK_ALL_NODES,
DEFAULT_RESERVE_CONT_LOOK_ALL_NODES);
}
private static String getAclKey(QueueACL acl) {
return "acl_" + acl.toString().toLowerCase();
}

View File

@ -21,10 +21,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.Comparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
/**
@ -55,4 +57,6 @@ public interface CapacitySchedulerContext {
ResourceCalculator getResourceCalculator();
Comparator<CSQueue> getQueueComparator();
FiCaSchedulerNode getNode(NodeId nodeId);
}

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -129,6 +130,8 @@ public class LeafQueue implements CSQueue {
private final ResourceCalculator resourceCalculator;
private boolean reservationsContinueLooking;
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) {
this.scheduler = cs;
@ -202,8 +205,9 @@ public class LeafQueue implements CSQueue {
maximumCapacity, absoluteMaxCapacity,
userLimit, userLimitFactor,
maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
.getConfiguration().getNodeLocalityDelay());
maxActiveApplications, maxActiveApplicationsPerUser, state, acls,
cs.getConfiguration().getNodeLocalityDelay(),
cs.getConfiguration().getReservationContinueLook());
if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName
@ -225,7 +229,8 @@ public class LeafQueue implements CSQueue {
int maxApplications, float maxAMResourcePerQueuePercent,
int maxApplicationsPerUser, int maxActiveApplications,
int maxActiveApplicationsPerUser, QueueState state,
Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay)
Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay,
boolean continueLooking)
{
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
@ -257,6 +262,7 @@ public class LeafQueue implements CSQueue {
this.queueInfo.setQueueState(this.state);
this.nodeLocalityDelay = nodeLocalityDelay;
this.reservationsContinueLooking = continueLooking;
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
@ -321,7 +327,9 @@ public class LeafQueue implements CSQueue {
" [= configuredState ]" + "\n" +
"acls = " + aclsString +
" [= configuredAcls ]" + "\n" +
"nodeLocalityDelay = " + nodeLocalityDelay + "\n");
"nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
"reservationsContinueLooking = " +
reservationsContinueLooking + "\n");
}
@Override
@ -555,6 +563,11 @@ public class LeafQueue implements CSQueue {
return nodeLocalityDelay;
}
@Private
boolean getReservationContinueLooking() {
return reservationsContinueLooking;
}
public String toString() {
return queueName + ": " +
"capacity=" + capacity + ", " +
@ -613,7 +626,8 @@ public class LeafQueue implements CSQueue {
newlyParsedLeafQueue.getMaximumActiveApplications(),
newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
newlyParsedLeafQueue.getNodeLocalityDelay());
newlyParsedLeafQueue.getNodeLocalityDelay(),
newlyParsedLeafQueue.reservationsContinueLooking);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
@ -802,8 +816,8 @@ public class LeafQueue implements CSQueue {
private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
@Override
public synchronized CSAssignment
assignContainers(Resource clusterResource, FiCaSchedulerNode node) {
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, boolean needToUnreserve) {
if(LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
@ -848,9 +862,17 @@ public class LeafQueue implements CSQueue {
Resource required = anyRequest.getCapability();
// Do we need containers at this 'priority'?
if (!needContainers(application, priority, required)) {
if (application.getTotalRequiredResources(priority) <= 0) {
continue;
}
if (!this.reservationsContinueLooking) {
if (!needContainers(application, priority, required)) {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
continue;
}
}
// Compute user-limit & set headroom
// Note: We compute both user-limit & headroom with the highest
@ -862,14 +884,14 @@ public class LeafQueue implements CSQueue {
required);
// Check queue max-capacity limit
if (!assignToQueue(clusterResource, required)) {
if (!assignToQueue(clusterResource, required, application, true)) {
return NULL_ASSIGNMENT;
}
// Check user limit
if (!assignToUser(
clusterResource, application.getUser(), userLimit)) {
break;
if (!assignToUser(clusterResource, application.getUser(), userLimit,
application, true)) {
break;
}
// Inform the application it is about to get a scheduling opportunity
@ -878,7 +900,7 @@ public class LeafQueue implements CSQueue {
// Try to schedule
CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority,
null);
null, needToUnreserve);
// Did the application skip this node?
if (assignment.getSkipped()) {
@ -900,6 +922,9 @@ public class LeafQueue implements CSQueue {
// otherwise the app will be delayed for each non-local assignment.
// This helps apps with many off-cluster requests schedule faster.
if (assignment.getType() != NodeType.OFF_SWITCH) {
if (LOG.isDebugEnabled()) {
LOG.debug("Resetting scheduling opportunities");
}
application.resetSchedulingOpportunities(priority);
}
@ -935,22 +960,57 @@ public class LeafQueue implements CSQueue {
// Try to assign if we have sufficient resources
assignContainersOnNode(clusterResource, node, application, priority,
rmContainer);
rmContainer, false);
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
}
private synchronized boolean assignToQueue(Resource clusterResource,
Resource required) {
@Private
protected synchronized boolean assignToQueue(Resource clusterResource,
Resource required, FiCaSchedulerApp application,
boolean checkReservations) {
Resource potentialTotalResource = Resources.add(usedResources, required);
// Check how of the cluster's absolute capacity we are currently using...
float potentialNewCapacity =
Resources.divide(
resourceCalculator, clusterResource,
Resources.add(usedResources, required),
clusterResource);
float potentialNewCapacity = Resources.divide(resourceCalculator,
clusterResource, potentialTotalResource, clusterResource);
if (potentialNewCapacity > absoluteMaxCapacity) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking && checkReservations) {
float potentialNewWithoutReservedCapacity = Resources.divide(
resourceCalculator,
clusterResource,
Resources.subtract(potentialTotalResource,
application.getCurrentReservation()),
clusterResource);
if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: "
+ getQueueName()
+ " usedResources: "
+ usedResources
+ " clusterResources: "
+ clusterResource
+ " reservedResources: "
+ application.getCurrentReservation()
+ " currentCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
usedResources, clusterResource) + " required " + required
+ " potentialNewWithoutReservedCapacity: "
+ potentialNewWithoutReservedCapacity + " ( " + " max-capacity: "
+ absoluteMaxCapacity + ")");
}
// we could potentially use this node instead of reserved node
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName()
+ " usedResources: " + usedResources
@ -966,6 +1026,8 @@ public class LeafQueue implements CSQueue {
return true;
}
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
private Resource computeUserLimitAndSetHeadroom(
FiCaSchedulerApp application, Resource clusterResource, Resource required) {
@ -1085,25 +1147,43 @@ public class LeafQueue implements CSQueue {
return limit;
}
private synchronized boolean assignToUser(Resource clusterResource,
String userName, Resource limit) {
@Private
protected synchronized boolean assignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
boolean checkReservations) {
User user = getUser(userName);
// Note: We aren't considering the current request since there is a fixed
// overhead of the AM, but it's a > check, not a >= check, so...
if (Resources.greaterThan(resourceCalculator, clusterResource,
user.getConsumedResources(), limit)) {
if (Resources.greaterThan(resourceCalculator, clusterResource,
user.getConsumedResources(), limit)) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking && checkReservations) {
if (Resources.lessThanOrEqual(
resourceCalculator,
clusterResource,
Resources.subtract(user.getConsumedResources(),
application.getCurrentReservation()), limit)) {
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName()
+ " will exceed limit based on reservations - " + " consumed: "
+ user.getConsumedResources() + " reserved: "
+ application.getCurrentReservation() + " limit: " + limit);
}
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName() +
" will exceed limit - " +
" consumed: " + user.getConsumedResources() +
" limit: " + limit
);
LOG.debug("User " + userName + " in queue " + getQueueName()
+ " will exceed limit - " + " consumed: "
+ user.getConsumedResources() + " limit: " + limit);
}
return false;
}
return true;
}
@ -1139,7 +1219,7 @@ public class LeafQueue implements CSQueue {
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, RMContainer reservedContainer) {
Priority priority, RMContainer reservedContainer, boolean needToUnreserve) {
Resource assigned = Resources.none();
@ -1149,7 +1229,7 @@ public class LeafQueue implements CSQueue {
if (nodeLocalResourceRequest != null) {
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer);
node, application, priority, reservedContainer, needToUnreserve);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
@ -1166,7 +1246,7 @@ public class LeafQueue implements CSQueue {
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer);
node, application, priority, reservedContainer, needToUnreserve);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
@ -1183,21 +1263,99 @@ public class LeafQueue implements CSQueue {
return new CSAssignment(
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer),
node, application, priority, reservedContainer, needToUnreserve),
NodeType.OFF_SWITCH);
}
return SKIP_ASSIGNMENT;
}
private Resource assignNodeLocalContainers(
Resource clusterResource, ResourceRequest nodeLocalResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, RMContainer reservedContainer) {
@Private
protected boolean findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
Resource capability) {
// need to unreserve some other container first
NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability);
if (idToUnreserve == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("checked to see if could unreserve for app but nothing "
+ "reserved that matches for this app");
}
return false;
}
FiCaSchedulerNode nodeToUnreserve = scheduler.getNode(idToUnreserve);
if (nodeToUnreserve == null) {
LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve);
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving for app: " + application.getApplicationId()
+ " on nodeId: " + idToUnreserve
+ " in order to replace reserved application and place it on node: "
+ node.getNodeID() + " needing: " + capability);
}
// headroom
Resources.addTo(application.getHeadroom(), nodeToUnreserve
.getReservedContainer().getReservedResource());
// Make sure to not have completedContainers sort the queues here since
// we are already inside an iterator loop for the queues and this would
// cause an concurrent modification exception.
completedContainer(clusterResource, application, nodeToUnreserve,
nodeToUnreserve.getReservedContainer(),
SchedulerUtils.createAbnormalContainerStatus(nodeToUnreserve
.getReservedContainer().getContainerId(),
SchedulerUtils.UNRESERVED_CONTAINER),
RMContainerEventType.RELEASED, null, false);
return true;
}
@Private
protected boolean checkLimitsToReserve(Resource clusterResource,
FiCaSchedulerApp application, Resource capability,
boolean needToUnreserve) {
if (needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("we needed to unreserve to be able to allocate");
}
return false;
}
// we can't reserve if we got here based on the limit
// checks assuming we could unreserve!!!
Resource userLimit = computeUserLimitAndSetHeadroom(application,
clusterResource, capability);
// Check queue max-capacity limit
if (!assignToQueue(clusterResource, capability, application, false)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit queue limit");
}
return false;
}
// Check user limit
if (!assignToUser(clusterResource, application.getUser(), userLimit,
application, false)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit user limit");
}
return false;
}
return true;
}
private Resource assignNodeLocalContainers(Resource clusterResource,
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve) {
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer);
return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
needToUnreserve);
}
return Resources.none();
@ -1206,11 +1364,12 @@ public class LeafQueue implements CSQueue {
private Resource assignRackLocalContainers(
Resource clusterResource, ResourceRequest rackLocalResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
RMContainer reservedContainer, boolean needToUnreserve) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer);
return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
needToUnreserve);
}
return Resources.none();
@ -1219,11 +1378,12 @@ public class LeafQueue implements CSQueue {
private Resource assignOffSwitchContainers(
Resource clusterResource, ResourceRequest offSwitchResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
RMContainer reservedContainer, boolean needToUnreserve) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer);
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
needToUnreserve);
}
return Resources.none();
@ -1303,14 +1463,17 @@ public class LeafQueue implements CSQueue {
return container;
}
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer) {
ResourceRequest request, NodeType type, RMContainer rmContainer,
boolean needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId()
+ " priority=" + priority.getPriority()
+ " request=" + request + " type=" + type);
+ " request=" + request + " type=" + type
+ " needToUnreserve= " + needToUnreserve);
}
Resource capability = request.getCapability();
Resource available = node.getAvailableResource();
@ -1335,6 +1498,18 @@ public class LeafQueue implements CSQueue {
return Resources.none();
}
// default to true since if reservation continue look feature isn't on
// needContainers is checked earlier and we wouldn't have gotten this far
boolean canAllocContainer = true;
if (this.reservationsContinueLooking) {
// based on reservations can we allocate/reserve more or do we need
// to unreserve one first
canAllocContainer = needContainers(application, priority, capability);
if (LOG.isDebugEnabled()) {
LOG.debug("can alloc container is: " + canAllocContainer);
}
}
// Can we allocate a container on this node?
int availableContainers =
resourceCalculator.computeAvailableContainers(available, capability);
@ -1342,8 +1517,28 @@ public class LeafQueue implements CSQueue {
// Allocate...
// Did we previously reserve containers at this 'priority'?
if (rmContainer != null){
if (rmContainer != null) {
unreserve(application, priority, node, rmContainer);
} else if (this.reservationsContinueLooking
&& (!canAllocContainer || needToUnreserve)) {
// need to unreserve some other container first
boolean res = findNodeToUnreserve(clusterResource, node, application,
priority, capability);
if (!res) {
return Resources.none();
}
} else {
// we got here by possibly ignoring queue capacity limits. If the
// parameter needToUnreserve is true it means we ignored one of those
// limits in the chance we could unreserve. If we are here we aren't
// trying to unreserve so we can't allocate anymore due to that parent
// limit.
if (needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("we needed to unreserve to be able to allocate, skipping");
}
return Resources.none();
}
}
// Inform the application
@ -1366,17 +1561,38 @@ public class LeafQueue implements CSQueue {
return container.getResource();
} else {
// Reserve by 'charging' in advance...
reserve(application, priority, node, rmContainer, container);
// if we are allowed to allocate but this node doesn't have space, reserve it or
// if this was an already a reserved container, reserve it again
if ((canAllocContainer) || (rmContainer != null)) {
LOG.info("Reserved container " +
" application attempt=" + application.getApplicationAttemptId() +
" resource=" + request.getCapability() +
" queue=" + this.toString() +
" node=" + node +
" clusterResource=" + clusterResource);
if (reservationsContinueLooking) {
// we got here by possibly ignoring parent queue capacity limits. If
// the parameter needToUnreserve is true it means we ignored one of
// those limits in the chance we could unreserve. If we are here
// we aren't trying to unreserve so we can't allocate
// anymore due to that parent limit
boolean res = checkLimitsToReserve(clusterResource, application, capability,
needToUnreserve);
if (!res) {
return Resources.none();
}
}
return request.getCapability();
// Reserve by 'charging' in advance...
reserve(application, priority, node, rmContainer, container);
LOG.info("Reserved container " +
" application=" + application.getApplicationId() +
" resource=" + request.getCapability() +
" queue=" + this.toString() +
" usedCapacity=" + getUsedCapacity() +
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
" used=" + usedResources +
" cluster=" + clusterResource);
return request.getCapability();
}
return Resources.none();
}
}
@ -1402,8 +1618,8 @@ public class LeafQueue implements CSQueue {
node.unreserveResource(application);
// Update reserved metrics
getMetrics().unreserveResource(
application.getUser(), rmContainer.getContainer().getResource());
getMetrics().unreserveResource(application.getUser(),
rmContainer.getContainer().getResource());
return true;
}
return false;
@ -1412,7 +1628,8 @@ public class LeafQueue implements CSQueue {
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue) {
ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
boolean sortQueues) {
if (application != null) {
boolean removed = false;
@ -1449,7 +1666,7 @@ public class LeafQueue implements CSQueue {
if (removed) {
// Inform the parent queue _outside_ of the leaf-queue lock
getParent().completedContainer(clusterResource, application, node,
rmContainer, null, event, this);
rmContainer, null, event, this, sortQueues);
}
}
}
@ -1466,6 +1683,8 @@ public class LeafQueue implements CSQueue {
String userName = application.getUser();
User user = getUser(userName);
user.assignContainer(resource);
// Note this is a bit unconventional since it gets the object and modifies it here
// rather then using set routine
Resources.subtractFrom(application.getHeadroom(), resource); // headroom
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
@ -1585,7 +1804,7 @@ public class LeafQueue implements CSQueue {
public synchronized void releaseContainer(Resource resource) {
Resources.subtractFrom(consumed, resource);
}
}
}
@Override

View File

@ -100,6 +100,8 @@ public class ParentQueue implements CSQueue {
RecordFactoryProvider.getRecordFactory(null);
private final ResourceCalculator resourceCalculator;
private boolean reservationsContinueLooking;
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) {
@ -146,7 +148,8 @@ public class ParentQueue implements CSQueue {
setupQueueConfigs(cs.getClusterResource(),
capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, state, acls);
maximumCapacity, absoluteMaxCapacity, state, acls,
cs.getConfiguration().getReservationContinueLook());
this.queueComparator = cs.getQueueComparator();
this.childQueues = new TreeSet<CSQueue>(queueComparator);
@ -160,7 +163,8 @@ public class ParentQueue implements CSQueue {
Resource clusterResource,
float capacity, float absoluteCapacity,
float maximumCapacity, float absoluteMaxCapacity,
QueueState state, Map<QueueACL, AccessControlList> acls
QueueState state, Map<QueueACL, AccessControlList> acls,
boolean continueLooking
) {
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
@ -180,6 +184,8 @@ public class ParentQueue implements CSQueue {
this.queueInfo.setMaximumCapacity(this.maximumCapacity);
this.queueInfo.setQueueState(this.state);
this.reservationsContinueLooking = continueLooking;
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
@ -195,7 +201,8 @@ public class ParentQueue implements CSQueue {
", maxCapacity=" + maximumCapacity +
", asboluteMaxCapacity=" + absoluteMaxCapacity +
", state=" + state +
", acls=" + aclsString);
", acls=" + aclsString +
", reservationsContinueLooking=" + reservationsContinueLooking);
}
private static float PRECISION = 0.0005f; // 0.05% precision
@ -383,7 +390,8 @@ public class ParentQueue implements CSQueue {
newlyParsedParentQueue.maximumCapacity,
newlyParsedParentQueue.absoluteMaxCapacity,
newlyParsedParentQueue.state,
newlyParsedParentQueue.acls);
newlyParsedParentQueue.acls,
newlyParsedParentQueue.reservationsContinueLooking);
// Re-configure existing child queues and add new ones
// The CS has already checked to ensure all existing child queues are present!
@ -551,7 +559,7 @@ public class ParentQueue implements CSQueue {
@Override
public synchronized CSAssignment assignContainers(
Resource clusterResource, FiCaSchedulerNode node) {
Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
@ -561,14 +569,19 @@ public class ParentQueue implements CSQueue {
+ getQueueName());
}
boolean localNeedToUnreserve = false;
// Are we over maximum-capacity for this queue?
if (!assignToQueue(clusterResource)) {
break;
// check to see if we could if we unreserve first
localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
if (!localNeedToUnreserve) {
break;
}
}
// Schedule
CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node);
assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve);
assignment.setType(assignedToChild.getType());
// Done if no child-queue assigned anything
@ -632,6 +645,39 @@ public class ParentQueue implements CSQueue {
return true;
}
private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) {
if (this.reservationsContinueLooking) {
// check to see if we could potentially use this node instead of a reserved
// node
Resource reservedResources = Resources.createResource(getMetrics()
.getReservedMB(), getMetrics().getReservedVirtualCores());
float capacityWithoutReservedCapacity = Resources.divide(
resourceCalculator, clusterResource,
Resources.subtract(usedResources, reservedResources),
clusterResource);
if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) {
if (LOG.isDebugEnabled()) {
LOG.debug("parent: try to use reserved: " + getQueueName()
+ " usedResources: " + usedResources.getMemory()
+ " clusterResources: " + clusterResource.getMemory()
+ " reservedResources: " + reservedResources.getMemory()
+ " currentCapacity " + ((float) usedResources.getMemory())
/ clusterResource.getMemory()
+ " potentialNewWithoutReservedCapacity: "
+ capacityWithoutReservedCapacity + " ( " + " max-capacity: "
+ absoluteMaxCapacity + ")");
}
// we could potentially use this node instead of reserved node
return true;
}
}
return false;
}
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
return (node.getReservedContainer() == null) &&
@ -640,7 +686,7 @@ public class ParentQueue implements CSQueue {
}
synchronized CSAssignment assignContainersToChildQueues(Resource cluster,
FiCaSchedulerNode node) {
FiCaSchedulerNode node, boolean needToUnreserve) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
@ -653,7 +699,7 @@ public class ParentQueue implements CSQueue {
LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
+ " stats: " + childQueue);
}
assignment = childQueue.assignContainers(cluster, node);
assignment = childQueue.assignContainers(cluster, node, needToUnreserve);
if(LOG.isDebugEnabled()) {
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +
@ -697,7 +743,8 @@ public class ParentQueue implements CSQueue {
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node,
RMContainer rmContainer, ContainerStatus containerStatus,
RMContainerEventType event, CSQueue completedChildQueue) {
RMContainerEventType event, CSQueue completedChildQueue,
boolean sortQueues) {
if (application != null) {
// Careful! Locking order is important!
// Book keeping
@ -713,16 +760,21 @@ public class ParentQueue implements CSQueue {
" cluster=" + clusterResource);
}
// reinsert the updated queue
for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
CSQueue csqueue = iter.next();
if(csqueue.equals(completedChildQueue))
{
iter.remove();
LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() +
" stats: " + csqueue);
childQueues.add(csqueue);
break;
// Note that this is using an iterator on the childQueues so this can't be
// called if already within an iterator for the childQueues. Like
// from assignContainersToChildQueues.
if (sortQueues) {
// reinsert the updated queue
for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
CSQueue csqueue = iter.next();
if(csqueue.equals(completedChildQueue))
{
iter.remove();
LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() +
" stats: " + csqueue);
childQueues.add(csqueue);
break;
}
}
}
@ -730,10 +782,15 @@ public class ParentQueue implements CSQueue {
if (parent != null) {
// complete my parent
parent.completedContainer(clusterResource, application,
node, rmContainer, null, event, this);
node, rmContainer, null, event, this, sortQueues);
}
}
}
@Private
boolean getReservationContinueLooking() {
return reservationsContinueLooking;
}
synchronized void allocateResource(Resource clusterResource,
Resource resource) {

View File

@ -254,5 +254,32 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
currentContPreemption, Collections.singletonList(rr),
allocation.getNMTokenList());
}
synchronized public NodeId getNodeIdToUnreserve(Priority priority,
Resource capability) {
// first go around make this algorithm simple and just grab first
// reservation that has enough resources
Map<NodeId, RMContainer> reservedContainers = this.reservedContainers
.get(priority);
if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
// make sure we unreserve one with at least the same amount of
// resources, otherwise could affect capacity limits
if (Resources.fitsIn(capability, entry.getValue().getContainer()
.getResource())) {
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving node with reservation size: "
+ entry.getValue().getContainer().getResource()
+ " in order to allocate container with size: " + capability);
}
return entry.getKey();
}
}
}
return null;
}
}

View File

@ -516,7 +516,7 @@ public class TestApplicationLimits {
app_0_0.updateResourceRequests(app_0_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0);
queue.assignContainers(clusterResource, node_0, false);
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
verify(app_0_0).setHeadroom(eq(expectedHeadroom));
@ -535,7 +535,7 @@ public class TestApplicationLimits {
app_0_1.updateResourceRequests(app_0_1_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0); // Schedule to compute
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom));
verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change
@ -554,7 +554,7 @@ public class TestApplicationLimits {
app_1_0.updateResourceRequests(app_1_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0); // Schedule to compute
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
verify(app_0_0).setHeadroom(eq(expectedHeadroom));
verify(app_0_1).setHeadroom(eq(expectedHeadroom));
@ -562,7 +562,7 @@ public class TestApplicationLimits {
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
queue.assignContainers(clusterResource, node_0); // Schedule to compute
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
verify(app_0_0).setHeadroom(eq(expectedHeadroom));
verify(app_0_1).setHeadroom(eq(expectedHeadroom));

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@ -141,7 +142,7 @@ public class TestChildQueueOrder {
// Next call - nothing
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).
when(queue).assignContainers(eq(clusterResource), eq(node));
when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean());
// Mock the node's resource availability
Resource available = node.getAvailableResource();
@ -152,7 +153,7 @@ public class TestChildQueueOrder {
return new CSAssignment(allocatedResource, type);
}
}).
when(queue).assignContainers(eq(clusterResource), eq(node));
when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean());
doNothing().when(node).releaseContainer(any(Container.class));
}
@ -244,7 +245,6 @@ public class TestChildQueueOrder {
doReturn(true).when(app_0).containerCompleted(any(RMContainer.class),
any(ContainerStatus.class),any(RMContainerEventType.class));
//
Priority priority = TestUtils.createMockPriority(1);
ContainerAllocationExpirer expirer =
mock(ContainerAllocationExpirer.class);
@ -269,14 +269,14 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
for(int i=0; i < 2; i++)
{
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
}
for(int i=0; i < 3; i++)
{
@ -284,7 +284,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
}
for(int i=0; i < 4; i++)
{
@ -292,7 +292,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
}
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -305,7 +305,7 @@ public class TestChildQueueOrder {
for(int i=0; i < 3;i++)
{
d.completedContainer(clusterResource, app_0, node_0,
rmContainer, null, RMContainerEventType.KILL, null);
rmContainer, null, RMContainerEventType.KILL, null, true);
}
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -325,7 +325,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
}
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -336,7 +336,7 @@ public class TestChildQueueOrder {
//Release 1GB Container from A
a.completedContainer(clusterResource, app_0, node_0,
rmContainer, null, RMContainerEventType.KILL, null);
rmContainer, null, RMContainerEventType.KILL, null, true);
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -352,7 +352,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 3*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -362,7 +362,7 @@ public class TestChildQueueOrder {
//Release 1GB container resources from B
b.completedContainer(clusterResource, app_0, node_0,
rmContainer, null, RMContainerEventType.KILL, null);
rmContainer, null, RMContainerEventType.KILL, null, true);
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -378,7 +378,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -392,12 +392,12 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
InOrder allocationOrder = inOrder(d,b);
allocationOrder.verify(d).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@ -252,7 +253,7 @@ public class TestLeafQueue {
doNothing().when(parent).completedContainer(
any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class),
any(RMContainer.class), any(ContainerStatus.class),
any(RMContainerEventType.class), any(CSQueue.class));
any(RMContainerEventType.class), any(CSQueue.class), anyBoolean());
return queue;
}
@ -325,7 +326,7 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(
(int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
a.getMetrics().getAvailableMB());
@ -460,7 +461,7 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -470,7 +471,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -478,7 +479,7 @@ public class TestLeafQueue {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Can't allocate 3rd due to user-limit
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -487,7 +488,7 @@ public class TestLeafQueue {
// Bump up user-limit-factor, now allocate should work
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -495,7 +496,7 @@ public class TestLeafQueue {
assertEquals(3*GB, a.getMetrics().getAllocatedMB());
// One more should work, for app_1, due to user-limit-factor
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@ -505,7 +506,7 @@ public class TestLeafQueue {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@ -518,7 +519,7 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
RMContainerEventType.KILL, null, true);
}
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -532,7 +533,7 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
RMContainerEventType.KILL, null, true);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
@ -620,19 +621,19 @@ public class TestLeafQueue {
// recordFactory)));
// 1 container to user_0
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
// One more to user_0 since he is the only active user
a.assignContainers(clusterResource, node_1);
a.assignContainers(clusterResource, node_1, false);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
@ -705,7 +706,7 @@ public class TestLeafQueue {
1, a.getActiveUsersManager().getNumActiveUsers());
// 1 container to user_0
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -713,7 +714,7 @@ public class TestLeafQueue {
assertEquals(0*GB, app_1.getHeadroom().getMemory()); // User limit = 2G
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@ -729,7 +730,7 @@ public class TestLeafQueue {
// No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap
a.assignContainers(clusterResource, node_1);
a.assignContainers(clusterResource, node_1, false);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@ -743,7 +744,7 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1);
a.assignContainers(clusterResource, node_1, false);
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
}
@ -813,21 +814,21 @@ public class TestLeafQueue {
*/
// Only 1 container
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Can't allocate 3rd due to user-limit
a.setUserLimit(25);
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -845,7 +846,7 @@ public class TestLeafQueue {
// Now allocations should goto app_2 since
// user_0 is at limit inspite of high user-limit-factor
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -854,7 +855,7 @@ public class TestLeafQueue {
// Now allocations should goto app_0 since
// user_0 is at user-limit not above it
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -864,7 +865,7 @@ public class TestLeafQueue {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -875,7 +876,7 @@ public class TestLeafQueue {
// Now, allocations should goto app_3 since it's under user-limit
a.setMaxCapacity(1.0f);
a.setUserLimitFactor(1);
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(7*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -883,7 +884,7 @@ public class TestLeafQueue {
assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
// Now we should assign to app_3 again since user_2 is under user-limit
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -896,7 +897,7 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
RMContainerEventType.KILL, null, true);
}
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -910,7 +911,7 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
RMContainerEventType.KILL, null, true);
}
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -924,7 +925,7 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
RMContainerEventType.KILL, null, true);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -982,7 +983,7 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -992,7 +993,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1000,7 +1001,7 @@ public class TestLeafQueue {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1015,8 +1016,8 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1031,8 +1032,8 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@ -1099,7 +1100,7 @@ public class TestLeafQueue {
// Start testing...
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1108,7 +1109,7 @@ public class TestLeafQueue {
assertEquals(0*GB, a.getMetrics().getAvailableMB());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1121,7 +1122,7 @@ public class TestLeafQueue {
// We do not need locality delay here
doReturn(-1).when(a).getNodeLocalityDelay();
a.assignContainers(clusterResource, node_1);
a.assignContainers(clusterResource, node_1, false);
assertEquals(10*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@ -1136,8 +1137,8 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(8*GB, app_1.getCurrentConsumption().getMemory());
@ -1205,20 +1206,20 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1231,8 +1232,8 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1241,7 +1242,7 @@ public class TestLeafQueue {
assertEquals(1, app_1.getReReservations(priority));
// Re-reserve
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1250,7 +1251,7 @@ public class TestLeafQueue {
assertEquals(2, app_1.getReReservations(priority));
// Try to schedule on node_1 now, should *move* the reservation
a.assignContainers(clusterResource, node_1);
a.assignContainers(clusterResource, node_1, false);
assertEquals(9*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@ -1266,8 +1267,8 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
CSAssignment assignment = a.assignContainers(clusterResource, node_0);
RMContainerEventType.KILL, null, true);
CSAssignment assignment = a.assignContainers(clusterResource, node_0, false);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@ -1278,6 +1279,7 @@ public class TestLeafQueue {
}
@Test
public void testLocalityScheduling() throws Exception {
@ -1337,7 +1339,7 @@ public class TestLeafQueue {
CSAssignment assignment = null;
// Start with off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2);
assignment = a.assignContainers(clusterResource, node_2, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
@ -1345,7 +1347,7 @@ public class TestLeafQueue {
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2);
assignment = a.assignContainers(clusterResource, node_2, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(2, app_0.getSchedulingOpportunities(priority));
@ -1353,7 +1355,7 @@ public class TestLeafQueue {
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2);
assignment = a.assignContainers(clusterResource, node_2, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(3, app_0.getSchedulingOpportunities(priority));
@ -1362,7 +1364,7 @@ public class TestLeafQueue {
// Another off switch, now we should allocate
// since missedOpportunities=3 and reqdContainers=3
assignment = a.assignContainers(clusterResource, node_2);
assignment = a.assignContainers(clusterResource, node_2, false);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset
@ -1370,7 +1372,7 @@ public class TestLeafQueue {
assertEquals(NodeType.OFF_SWITCH, assignment.getType());
// NODE_LOCAL - node_0
assignment = a.assignContainers(clusterResource, node_0);
assignment = a.assignContainers(clusterResource, node_0, false);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@ -1378,7 +1380,7 @@ public class TestLeafQueue {
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
// NODE_LOCAL - node_1
assignment = a.assignContainers(clusterResource, node_1);
assignment = a.assignContainers(clusterResource, node_1, false);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@ -1406,13 +1408,13 @@ public class TestLeafQueue {
doReturn(1).when(a).getNodeLocalityDelay();
// Shouldn't assign RACK_LOCAL yet
assignment = a.assignContainers(clusterResource, node_3);
assignment = a.assignContainers(clusterResource, node_3, false);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(2, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Should assign RACK_LOCAL now
assignment = a.assignContainers(clusterResource, node_3);
assignment = a.assignContainers(clusterResource, node_3, false);
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@ -1493,7 +1495,7 @@ public class TestLeafQueue {
// Start with off switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2);
a.assignContainers(clusterResource, node_2, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
@ -1505,7 +1507,7 @@ public class TestLeafQueue {
// Another off-switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2);
a.assignContainers(clusterResource, node_2, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
@ -1516,7 +1518,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Another off-switch, shouldn't allocate OFF_SWITCH P1
a.assignContainers(clusterResource, node_2);
a.assignContainers(clusterResource, node_2, false);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(3, app_0.getSchedulingOpportunities(priority_1));
@ -1527,7 +1529,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, DATA_LOCAL for P1
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@ -1538,7 +1540,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, OFF_SWITCH for P2
a.assignContainers(clusterResource, node_1);
a.assignContainers(clusterResource, node_1, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@ -1614,7 +1616,7 @@ public class TestLeafQueue {
app_0.updateResourceRequests(app_0_requests_0);
// NODE_LOCAL - node_0_1
a.assignContainers(clusterResource, node_0_0);
a.assignContainers(clusterResource, node_0_0, false);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@ -1622,7 +1624,7 @@ public class TestLeafQueue {
// No allocation on node_1_0 even though it's node/rack local since
// required(ANY) == 0
a.assignContainers(clusterResource, node_1_0);
a.assignContainers(clusterResource, node_1_0, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
@ -1638,14 +1640,14 @@ public class TestLeafQueue {
// No allocation on node_0_1 even though it's node/rack local since
// required(rack_1) == 0
a.assignContainers(clusterResource, node_0_1);
a.assignContainers(clusterResource, node_0_1, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(1, app_0.getTotalRequiredResources(priority));
// NODE_LOCAL - node_1
a.assignContainers(clusterResource, node_1_0);
a.assignContainers(clusterResource, node_1_0, false);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@ -1889,7 +1891,7 @@ public class TestLeafQueue {
// node_0_1
// Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
a.assignContainers(clusterResource, node_0_1);
a.assignContainers(clusterResource, node_0_1, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@ -1911,7 +1913,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since RR(rack_1) = relax: false
a.assignContainers(clusterResource, node_1_1);
a.assignContainers(clusterResource, node_1_1, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@ -1941,7 +1943,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since node_1_1 is blacklisted
a.assignContainers(clusterResource, node_1_1);
a.assignContainers(clusterResource, node_1_1, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@ -1969,7 +1971,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since rack_1 is blacklisted
a.assignContainers(clusterResource, node_1_1);
a.assignContainers(clusterResource, node_1_1, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@ -1995,7 +1997,7 @@ public class TestLeafQueue {
// Blacklist: < host_0_0 > <----
// Now, should allocate since RR(rack_1) = relax: true
a.assignContainers(clusterResource, node_1_1);
a.assignContainers(clusterResource, node_1_1, false);
verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority));
@ -2025,7 +2027,7 @@ public class TestLeafQueue {
// host_1_0: 8G
// host_1_1: 7G
a.assignContainers(clusterResource, node_1_0);
a.assignContainers(clusterResource, node_1_0, false);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority));
@ -2105,7 +2107,7 @@ public class TestLeafQueue {
recordFactory)));
try {
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
} catch (NullPointerException e) {
Assert.fail("NPE when allocating container on node but "
+ "forget to set off-switch request should be handled");

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@ -153,7 +154,7 @@ public class TestParentQueue {
// Next call - nothing
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).
when(queue).assignContainers(eq(clusterResource), eq(node));
when(queue).assignContainers(eq(clusterResource), eq(node), eq(false));
// Mock the node's resource availability
Resource available = node.getAvailableResource();
@ -164,7 +165,7 @@ public class TestParentQueue {
return new CSAssignment(allocatedResource, type);
}
}).
when(queue).assignContainers(eq(clusterResource), eq(node));
when(queue).assignContainers(eq(clusterResource), eq(node), eq(false));
}
private float computeQueueAbsoluteUsedCapacity(CSQueue queue,
@ -227,19 +228,19 @@ public class TestParentQueue {
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
stubQueueAllocation(a, clusterResource, node_1, 2*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1);
root.assignContainers(clusterResource, node_1, false);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -247,12 +248,12 @@ public class TestParentQueue {
// since A has 2/6G while B has 2/14G
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
stubQueueAllocation(b, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -260,12 +261,12 @@ public class TestParentQueue {
// since A has 3/6G while B has 4/14G
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 4*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
@ -273,12 +274,12 @@ public class TestParentQueue {
// since A has 3/6G while B has 8/14G
stubQueueAllocation(a, clusterResource, node_1, 1*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1);
root.assignContainers(clusterResource, node_1, false);
allocationOrder = inOrder(a, b);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 4*GB, clusterResource);
verifyQueueMetrics(b, 9*GB, clusterResource);
}
@ -439,7 +440,7 @@ public class TestParentQueue {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 0*GB, clusterResource);
verifyQueueMetrics(c, 1*GB, clusterResource);
@ -451,7 +452,7 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_1, 0*GB);
stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
stubQueueAllocation(c, clusterResource, node_1, 0*GB);
root.assignContainers(clusterResource, node_1);
root.assignContainers(clusterResource, node_1, false);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
verifyQueueMetrics(c, 1*GB, clusterResource);
@ -462,14 +463,14 @@ public class TestParentQueue {
stubQueueAllocation(a1, clusterResource, node_0, 1*GB);
stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
stubQueueAllocation(c, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
InOrder allocationOrder = inOrder(a, c, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 6*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -488,16 +489,16 @@ public class TestParentQueue {
stubQueueAllocation(b3, clusterResource, node_2, 1*GB);
stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
stubQueueAllocation(c, clusterResource, node_2, 1*GB);
root.assignContainers(clusterResource, node_2);
root.assignContainers(clusterResource, node_2, false);
allocationOrder = inOrder(a, a2, a1, b, c);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(a2).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
verifyQueueMetrics(c, 4*GB, clusterResource);
@ -597,7 +598,7 @@ public class TestParentQueue {
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
@ -605,12 +606,12 @@ public class TestParentQueue {
// also, B gets a scheduling opportunity since A allocates RACK_LOCAL
stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1);
root.assignContainers(clusterResource, node_1, false);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -619,12 +620,12 @@ public class TestParentQueue {
// However, since B returns off-switch, A won't get an opportunity
stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -663,7 +664,7 @@ public class TestParentQueue {
// Simulate B3 returning a container on node_0
stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
verifyQueueMetrics(b2, 0*GB, clusterResource);
verifyQueueMetrics(b3, 1*GB, clusterResource);
@ -671,12 +672,12 @@ public class TestParentQueue {
// also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL
stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1);
root.assignContainers(clusterResource, node_1, false);
InOrder allocationOrder = inOrder(b2, b3);
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 2*GB, clusterResource);
@ -685,12 +686,12 @@ public class TestParentQueue {
// However, since B3 returns off-switch, B2 won't get an opportunity
stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
allocationOrder = inOrder(b3, b2);
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 3*GB, clusterResource);