YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend container allocation logic. Contributed by Wangda Tan

(cherry picked from commit ba2313d614)
This commit is contained in:
Jian He 2015-08-05 13:45:17 -07:00
parent 707b96fa58
commit 1466772827
11 changed files with 1011 additions and 692 deletions

View File

@ -339,6 +339,9 @@ Release 2.8.0 - UNRELEASED
YARN-2768. Avoid cloning Resource in FSAppAttempt#updateDemand.
(Hong Zhiguo via kasha)
YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend
container allocation logic. (Wangda Tan via jianhe)
BUG FIXES
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena

View File

@ -53,13 +53,7 @@
import com.google.common.collect.Sets;
public abstract class AbstractCSQueue implements CSQueue {
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
static final CSAssignment NULL_ASSIGNMENT =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
CSQueue parent;
final String queueName;
volatile int numContainers;

View File

@ -24,12 +24,17 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
public class CSAssignment {
public static final CSAssignment NULL_ASSIGNMENT =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
final private Resource resource;
public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
private Resource resource;
private NodeType type;
private RMContainer excessReservation;
private FiCaSchedulerApp application;
@ -67,6 +72,10 @@ public CSAssignment(Resource resource, NodeType type,
public Resource getResource() {
return resource;
}
public void setResource(Resource resource) {
this.resource = resource;
}
public NodeType getType() {
return type;

View File

@ -777,7 +777,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) {
return NULL_ASSIGNMENT;
return CSAssignment.NULL_ASSIGNMENT;
}
// Check if this queue need more resource, simply skip allocation if this
@ -789,7 +789,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + node.getPartition());
}
return NULL_ASSIGNMENT;
return CSAssignment.NULL_ASSIGNMENT;
}
for (Iterator<FiCaSchedulerApp> assignmentIterator =
@ -800,7 +800,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
currentResourceLimits, application.getCurrentReservation(),
schedulingMode)) {
return NULL_ASSIGNMENT;
return CSAssignment.NULL_ASSIGNMENT;
}
Resource userLimit =
@ -846,11 +846,11 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
} else if (!assignment.getSkipped()) {
// If we don't allocate anything, and it is not skipped by application,
// we will return to respect FIFO of applications
return NULL_ASSIGNMENT;
return CSAssignment.NULL_ASSIGNMENT;
}
}
return NULL_ASSIGNMENT;
return CSAssignment.NULL_ASSIGNMENT;
}
protected Resource getHeadroom(User user, Resource queueCurrentLimit,

View File

@ -384,7 +384,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) {
return NULL_ASSIGNMENT;
return CSAssignment.NULL_ASSIGNMENT;
}
// Check if this queue need more resource, simply skip allocation if this
@ -396,7 +396,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + node.getPartition());
}
return NULL_ASSIGNMENT;
return CSAssignment.NULL_ASSIGNMENT;
}
CSAssignment assignment =

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
public enum AllocationState {
APP_SKIPPED,
PRIORITY_SKIPPED,
LOCALITY_SKIPPED,
QUEUE_SKIPPED,
ALLOCATED,
RESERVED
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerAllocation {
public static final ContainerAllocation PRIORITY_SKIPPED =
new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED);
public static final ContainerAllocation APP_SKIPPED =
new ContainerAllocation(null, null, AllocationState.APP_SKIPPED);
public static final ContainerAllocation QUEUE_SKIPPED =
new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED);
public static final ContainerAllocation LOCALITY_SKIPPED =
new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED);
RMContainer containerToBeUnreserved;
private Resource resourceToBeAllocated = Resources.none();
AllocationState state;
NodeType containerNodeType = NodeType.NODE_LOCAL;
NodeType requestNodeType = NodeType.NODE_LOCAL;
Container updatedContainer;
public ContainerAllocation(RMContainer containerToBeUnreserved,
Resource resourceToBeAllocated, AllocationState state) {
this.containerToBeUnreserved = containerToBeUnreserved;
this.resourceToBeAllocated = resourceToBeAllocated;
this.state = state;
}
public RMContainer getContainerToBeUnreserved() {
return containerToBeUnreserved;
}
public Resource getResourceToBeAllocated() {
if (resourceToBeAllocated == null) {
return Resources.none();
}
return resourceToBeAllocated;
}
public AllocationState getAllocationState() {
return state;
}
public NodeType getContainerNodeType() {
return containerNodeType;
}
public Container getUpdatedContainer() {
return updatedContainer;
}
}

View File

@ -0,0 +1,115 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* For an application, resource limits and resource requests, decide how to
* allocate container. This is to make application resource allocation logic
* extensible.
*/
public abstract class ContainerAllocator {
FiCaSchedulerApp application;
final ResourceCalculator rc;
final RMContext rmContext;
public ContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext) {
this.application = application;
this.rc = rc;
this.rmContext = rmContext;
}
/**
* preAllocation is to perform checks, etc. to see if we can/cannot allocate
* container. It will put necessary information to returned
* {@link ContainerAllocation}.
*/
abstract ContainerAllocation preAllocation(
Resource clusterResource, FiCaSchedulerNode node,
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
Priority priority, RMContainer reservedContainer);
/**
* doAllocation is to update application metrics, create containers, etc.
* According to allocating conclusion decided by preAllocation.
*/
abstract ContainerAllocation doAllocation(
ContainerAllocation allocationResult, Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority,
RMContainer reservedContainer);
boolean checkHeadroom(Resource clusterResource,
ResourceLimits currentResourceLimits, Resource required,
FiCaSchedulerNode node) {
// If headroom + currentReservation < required, we cannot allocate this
// require
Resource resourceCouldBeUnReserved = application.getCurrentReservation();
if (!application.getCSLeafQueue().getReservationContinueLooking()
|| !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
// If we don't allow reservation continuous looking, OR we're looking at
// non-default node partition, we won't allow to unreserve before
// allocation.
resourceCouldBeUnReserved = Resources.none();
}
return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
required);
}
/**
* allocate needs to handle following stuffs:
*
* <ul>
* <li>Select request: Select a request to allocate. E.g. select a resource
* request based on requirement/priority/locality.</li>
* <li>Check if a given resource can be allocated based on resource
* availability</li>
* <li>Do allocation: this will decide/create allocated/reserved
* container, this will also update metrics</li>
* </ul>
*/
public ContainerAllocation allocate(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, Priority priority,
RMContainer reservedContainer) {
ContainerAllocation result =
preAllocation(clusterResource, node, schedulingMode,
resourceLimits, priority, reservedContainer);
if (AllocationState.ALLOCATED == result.state
|| AllocationState.RESERVED == result.state) {
result = doAllocation(result, clusterResource, node,
schedulingMode, priority, reservedContainer);
}
return result;
}
}

View File

@ -0,0 +1,629 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Allocate normal (new) containers, considers locality/label, etc. Using
* delayed scheduling mechanism to get better locality allocation.
*/
public class RegularContainerAllocator extends ContainerAllocator {
private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class);
private ResourceRequest lastResourceRequest = null;
public RegularContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext) {
super(application, rc, rmContext);
}
private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, Priority priority) {
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
return ContainerAllocation.APP_SKIPPED;
}
ResourceRequest anyRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (null == anyRequest) {
return ContainerAllocation.PRIORITY_SKIPPED;
}
// Required resource
Resource required = anyRequest.getCapability();
// Do we need containers at this 'priority'?
if (application.getTotalRequiredResources(priority) <= 0) {
return ContainerAllocation.PRIORITY_SKIPPED;
}
// AM container allocation doesn't support non-exclusive allocation to
// avoid painful of preempt an AM container
if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
RMAppAttempt rmAppAttempt =
rmContext.getRMApps().get(application.getApplicationId())
.getCurrentAppAttempt();
if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
&& null == rmAppAttempt.getMasterContainer()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip allocating AM container to app_attempt="
+ application.getApplicationAttemptId()
+ ", don't allow to allocate AM container in non-exclusive mode");
}
return ContainerAllocation.APP_SKIPPED;
}
}
// Is the node-label-expression of this offswitch resource request
// matches the node's label?
// If not match, jump to next priority.
if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(anyRequest,
node.getPartition(), schedulingMode)) {
return ContainerAllocation.PRIORITY_SKIPPED;
}
if (!application.getCSLeafQueue().getReservationContinueLooking()) {
if (!shouldAllocOrReserveNewContainer(priority, required)) {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
return ContainerAllocation.PRIORITY_SKIPPED;
}
}
if (!checkHeadroom(clusterResource, resourceLimits, required, node)) {
if (LOG.isDebugEnabled()) {
LOG.debug("cannot allocate required resource=" + required
+ " because of headroom");
}
return ContainerAllocation.QUEUE_SKIPPED;
}
// Inform the application it is about to get a scheduling opportunity
application.addSchedulingOpportunity(priority);
// Increase missed-non-partitioned-resource-request-opportunity.
// This is to make sure non-partitioned-resource-request will prefer
// to be allocated to non-partitioned nodes
int missedNonPartitionedRequestSchedulingOpportunity = 0;
if (anyRequest.getNodeLabelExpression()
.equals(RMNodeLabelsManager.NO_LABEL)) {
missedNonPartitionedRequestSchedulingOpportunity =
application
.addMissedNonPartitionedRequestSchedulingOpportunity(priority);
}
if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
// Before doing allocation, we need to check scheduling opportunity to
// make sure : non-partitioned resource request should be scheduled to
// non-partitioned partition first.
if (missedNonPartitionedRequestSchedulingOpportunity < rmContext
.getScheduler().getNumClusterNodes()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
+ " priority=" + priority
+ " because missed-non-partitioned-resource-request"
+ " opportunity under requred:" + " Now="
+ missedNonPartitionedRequestSchedulingOpportunity + " required="
+ rmContext.getScheduler().getNumClusterNodes());
}
return ContainerAllocation.APP_SKIPPED;
}
}
return null;
}
@Override
ContainerAllocation preAllocation(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, Priority priority,
RMContainer reservedContainer) {
ContainerAllocation result;
if (null == reservedContainer) {
// pre-check when allocating new container
result =
preCheckForNewContainer(clusterResource, node, schedulingMode,
resourceLimits, priority);
if (null != result) {
return result;
}
} else {
// pre-check when allocating reserved container
if (application.getTotalRequiredResources(priority) == 0) {
// Release
return new ContainerAllocation(reservedContainer, null,
AllocationState.QUEUE_SKIPPED);
}
}
// Try to allocate containers on node
result =
assignContainersOnNode(clusterResource, node, priority,
reservedContainer, schedulingMode, resourceLimits);
if (null == reservedContainer) {
if (result.state == AllocationState.PRIORITY_SKIPPED) {
// Don't count 'skipped nodes' as a scheduling opportunity!
application.subtractSchedulingOpportunity(priority);
}
}
return result;
}
public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources =
Math.max(application.getResourceRequests(priority).size() - 1, 0);
// waitFactor can't be more than '1'
// i.e. no point skipping more than clustersize opportunities
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
private int getActualNodeLocalityDelay() {
return Math.min(rmContext.getScheduler().getNumClusterNodes(), application
.getCSLeafQueue().getNodeLocalityDelay());
}
private boolean canAssign(Priority priority, FiCaSchedulerNode node,
NodeType type, RMContainer reservedContainer) {
// Clearly we need containers for this application...
if (type == NodeType.OFF_SWITCH) {
if (reservedContainer != null) {
return true;
}
// 'Delay' off-switch
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
long missedOpportunities = application.getSchedulingOpportunities(priority);
long requiredContainers = offSwitchRequest.getNumContainers();
float localityWaitFactor =
getLocalityWaitFactor(priority, rmContext.getScheduler()
.getNumClusterNodes());
return ((requiredContainers * localityWaitFactor) < missedOpportunities);
}
// Check if we need containers on this rack
ResourceRequest rackLocalRequest =
application.getResourceRequest(priority, node.getRackName());
if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
return false;
}
// If we are here, we do need containers on this rack for RACK_LOCAL req
if (type == NodeType.RACK_LOCAL) {
// 'Delay' rack-local just a little bit...
long missedOpportunities = application.getSchedulingOpportunities(priority);
return getActualNodeLocalityDelay() < missedOpportunities;
}
// Check if we need containers on this host
if (type == NodeType.NODE_LOCAL) {
// Now check if we need containers on this host...
ResourceRequest nodeLocalRequest =
application.getResourceRequest(priority, node.getNodeName());
if (nodeLocalRequest != null) {
return nodeLocalRequest.getNumContainers() > 0;
}
}
return false;
}
private ContainerAllocation assignNodeLocalContainers(
Resource clusterResource, ResourceRequest nodeLocalResourceRequest,
FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
if (canAssign(priority, node, NodeType.NODE_LOCAL, reservedContainer)) {
return assignContainer(clusterResource, node, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
schedulingMode, currentResoureLimits);
}
// Skip node-local request, go to rack-local request
return ContainerAllocation.LOCALITY_SKIPPED;
}
private ContainerAllocation assignRackLocalContainers(
Resource clusterResource, ResourceRequest rackLocalResourceRequest,
FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
if (canAssign(priority, node, NodeType.RACK_LOCAL, reservedContainer)) {
return assignContainer(clusterResource, node, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
schedulingMode, currentResoureLimits);
}
// Skip rack-local request, go to off-switch request
return ContainerAllocation.LOCALITY_SKIPPED;
}
private ContainerAllocation assignOffSwitchContainers(
Resource clusterResource, ResourceRequest offSwitchResourceRequest,
FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
if (canAssign(priority, node, NodeType.OFF_SWITCH, reservedContainer)) {
return assignContainer(clusterResource, node, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
schedulingMode, currentResoureLimits);
}
return ContainerAllocation.QUEUE_SKIPPED;
}
private ContainerAllocation assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
ContainerAllocation assigned;
NodeType requestType = null;
// Data-local
ResourceRequest nodeLocalResourceRequest =
application.getResourceRequest(priority, node.getNodeName());
if (nodeLocalResourceRequest != null) {
requestType = NodeType.NODE_LOCAL;
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, priority, reservedContainer, schedulingMode,
currentResoureLimits);
if (Resources.greaterThan(rc, clusterResource,
assigned.getResourceToBeAllocated(), Resources.none())) {
assigned.requestNodeType = requestType;
return assigned;
}
}
// Rack-local
ResourceRequest rackLocalResourceRequest =
application.getResourceRequest(priority, node.getRackName());
if (rackLocalResourceRequest != null) {
if (!rackLocalResourceRequest.getRelaxLocality()) {
return ContainerAllocation.PRIORITY_SKIPPED;
}
if (requestType != NodeType.NODE_LOCAL) {
requestType = NodeType.RACK_LOCAL;
}
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, priority, reservedContainer, schedulingMode,
currentResoureLimits);
if (Resources.greaterThan(rc, clusterResource,
assigned.getResourceToBeAllocated(), Resources.none())) {
assigned.requestNodeType = requestType;
return assigned;
}
}
// Off-switch
ResourceRequest offSwitchResourceRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (offSwitchResourceRequest != null) {
if (!offSwitchResourceRequest.getRelaxLocality()) {
return ContainerAllocation.PRIORITY_SKIPPED;
}
if (requestType != NodeType.NODE_LOCAL
&& requestType != NodeType.RACK_LOCAL) {
requestType = NodeType.OFF_SWITCH;
}
assigned =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, priority, reservedContainer, schedulingMode,
currentResoureLimits);
assigned.requestNodeType = requestType;
return assigned;
}
return ContainerAllocation.PRIORITY_SKIPPED;
}
private ContainerAllocation assignContainer(Resource clusterResource,
FiCaSchedulerNode node, Priority priority, ResourceRequest request,
NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode,
ResourceLimits currentResoureLimits) {
lastResourceRequest = request;
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId()
+ " priority=" + priority.getPriority()
+ " request=" + request + " type=" + type);
}
// check if the resource request can access the label
if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
node.getPartition(), schedulingMode)) {
// this is a reserved container, but we cannot allocate it now according
// to label not match. This can be caused by node label changed
// We should un-reserve this container.
return new ContainerAllocation(rmContainer, null,
AllocationState.QUEUE_SKIPPED);
}
Resource capability = request.getCapability();
Resource available = node.getAvailableResource();
Resource totalResource = node.getTotalResource();
if (!Resources.lessThanOrEqual(rc, clusterResource,
capability, totalResource)) {
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource());
return ContainerAllocation.QUEUE_SKIPPED;
}
assert Resources.greaterThan(
rc, clusterResource, available, Resources.none());
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
priority, capability);
// Can we allocate a container on this node?
int availableContainers =
rc.computeAvailableContainers(available, capability);
// How much need to unreserve equals to:
// max(required - headroom, amountNeedUnreserve)
Resource resourceNeedToUnReserve =
Resources.max(rc, clusterResource,
Resources.subtract(capability, currentResoureLimits.getHeadroom()),
currentResoureLimits.getAmountNeededUnreserve());
boolean needToUnreserve =
Resources.greaterThan(rc, clusterResource,
resourceNeedToUnReserve, Resources.none());
RMContainer unreservedContainer = null;
boolean reservationsContinueLooking =
application.getCSLeafQueue().getReservationContinueLooking();
if (availableContainers > 0) {
// Allocate...
// We will only do continuous reservation when this is not allocated from
// reserved container
if (rmContainer == null && reservationsContinueLooking
&& node.getLabels().isEmpty()) {
// when reservationsContinueLooking is set, we may need to unreserve
// some containers to meet this queue, its parents', or the users'
// resource limits.
// TODO, need change here when we want to support continuous reservation
// looking for labeled partitions.
if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
if (!needToUnreserve) {
// If we shouldn't allocate/reserve new container then we should
// unreserve one the same size we are asking for since the
// currentResoureLimits.getAmountNeededUnreserve could be zero. If
// the limit was hit then use the amount we need to unreserve to be
// under the limit.
resourceNeedToUnReserve = capability;
}
unreservedContainer =
application.findNodeToUnreserve(clusterResource, node, priority,
resourceNeedToUnReserve);
// When (minimum-unreserved-resource > 0 OR we cannot allocate
// new/reserved
// container (That means we *have to* unreserve some resource to
// continue)). If we failed to unreserve some resource, we can't
// continue.
if (null == unreservedContainer) {
return ContainerAllocation.QUEUE_SKIPPED;
}
}
}
ContainerAllocation result =
new ContainerAllocation(unreservedContainer, request.getCapability(),
AllocationState.ALLOCATED);
result.containerNodeType = type;
return result;
} else {
// if we are allowed to allocate but this node doesn't have space, reserve it or
// if this was an already a reserved container, reserve it again
if (shouldAllocOrReserveNewContainer || rmContainer != null) {
if (reservationsContinueLooking && rmContainer == null) {
// we could possibly ignoring queue capacity or user limits when
// reservationsContinueLooking is set. Make sure we didn't need to unreserve
// one.
if (needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("we needed to unreserve to be able to allocate");
}
return ContainerAllocation.QUEUE_SKIPPED;
}
}
ContainerAllocation result =
new ContainerAllocation(null, request.getCapability(),
AllocationState.RESERVED);
result.containerNodeType = type;
return result;
}
return ContainerAllocation.QUEUE_SKIPPED;
}
}
boolean
shouldAllocOrReserveNewContainer(Priority priority, Resource required) {
int requiredContainers = application.getTotalRequiredResources(priority);
int reservedContainers = application.getNumReservedContainers(priority);
int starvation = 0;
if (reservedContainers > 0) {
float nodeFactor =
Resources
.ratio(rc, required, application.getCSLeafQueue().getMaximumAllocation());
// Use percentage of node required to bias against large containers...
// Protect against corner case where you need the whole node with
// Math.min(nodeFactor, minimumAllocationFactor)
starvation =
(int) ((application.getReReservations(priority) /
(float) reservedContainers) * (1.0f - (Math.min(
nodeFactor, application.getCSLeafQueue()
.getMinimumAllocationFactor()))));
if (LOG.isDebugEnabled()) {
LOG.debug("needsContainers:" + " app.#re-reserve="
+ application.getReReservations(priority) + " reserved="
+ reservedContainers + " nodeFactor=" + nodeFactor
+ " minAllocFactor="
+ application.getCSLeafQueue().getMinimumAllocationFactor()
+ " starvation=" + starvation);
}
}
return (((starvation + requiredContainers) - reservedContainers) > 0);
}
private Container getContainer(RMContainer rmContainer,
FiCaSchedulerNode node, Resource capability, Priority priority) {
return (rmContainer != null) ? rmContainer.getContainer()
: createContainer(node, capability, priority);
}
private Container createContainer(FiCaSchedulerNode node, Resource capability,
Priority priority) {
NodeId nodeId = node.getRMNode().getNodeID();
ContainerId containerId =
BuilderUtils.newContainerId(application.getApplicationAttemptId(),
application.getNewContainerId());
// Create the container
return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
.getHttpAddress(), capability, priority, null);
}
private ContainerAllocation handleNewContainerAllocation(
ContainerAllocation allocationResult, FiCaSchedulerNode node,
Priority priority, RMContainer reservedContainer, Container container) {
// Handling container allocation
// Did we previously reserve containers at this 'priority'?
if (reservedContainer != null) {
application.unreserve(priority, node, reservedContainer);
}
// Inform the application
RMContainer allocatedContainer =
application.allocate(allocationResult.containerNodeType, node,
priority, lastResourceRequest, container);
// Does the application need this resource?
if (allocatedContainer == null) {
// Skip this app if we failed to allocate.
ContainerAllocation ret =
new ContainerAllocation(allocationResult.containerToBeUnreserved,
null, AllocationState.QUEUE_SKIPPED);
ret.state = AllocationState.APP_SKIPPED;
return ret;
}
// Inform the node
node.allocateContainer(allocatedContainer);
// update locality statistics
application.incNumAllocatedContainers(allocationResult.containerNodeType,
allocationResult.requestNodeType);
return allocationResult;
}
@Override
ContainerAllocation doAllocation(ContainerAllocation allocationResult,
Resource clusterResource, FiCaSchedulerNode node,
SchedulingMode schedulingMode, Priority priority,
RMContainer reservedContainer) {
// Create the container if necessary
Container container =
getContainer(reservedContainer, node,
allocationResult.getResourceToBeAllocated(), priority);
// something went wrong getting/creating the container
if (container == null) {
LOG.warn("Couldn't get container for allocation!");
return ContainerAllocation.QUEUE_SKIPPED;
}
if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) {
// When allocating container
allocationResult =
handleNewContainerAllocation(allocationResult, node, priority,
reservedContainer, container);
} else {
// When reserving container
application.reserve(priority, node, reservedContainer, container);
}
allocationResult.updatedContainer = container;
// Only reset opportunities when we FIRST allocate the container. (IAW, When
// reservedContainer != null, it's not the first time)
if (reservedContainer == null) {
// Don't reset scheduling opportunities for off-switch assignments
// otherwise the app will be delayed for each non-local assignment.
// This helps apps with many off-cluster requests schedule faster.
if (allocationResult.containerNodeType != NodeType.OFF_SWITCH) {
if (LOG.isDebugEnabled()) {
LOG.debug("Resetting scheduling opportunities");
}
application.resetSchedulingOpportunities(priority);
}
// Non-exclusive scheduling opportunity is different: we need reset
// it every time to make sure non-labeled resource request will be
// most likely allocated on non-labeled nodes first.
application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
}
return allocationResult;
}
}

View File

@ -24,7 +24,6 @@
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -40,9 +39,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@ -54,15 +51,16 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -78,11 +76,6 @@
public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
static final CSAssignment NULL_ASSIGNMENT =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
private final Set<ContainerId> containersToPreempt =
new HashSet<ContainerId>();
@ -91,6 +84,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private ResourceCalculator rc = new DefaultResourceCalculator();
private ResourceScheduler scheduler;
private ContainerAllocator containerAllocator;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
@ -124,6 +119,8 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
if (scheduler.getResourceCalculator() != null) {
rc = scheduler.getResourceCalculator();
}
containerAllocator = new RegularContainerAllocator(this, rc, rmContext);
}
synchronized public boolean containerCompleted(RMContainer rmContainer,
@ -386,223 +383,6 @@ public synchronized void transferStateFromPreviousAttempt(
((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
}
private int getActualNodeLocalityDelay() {
return Math.min(scheduler.getNumClusterNodes(), getCSLeafQueue()
.getNodeLocalityDelay());
}
private boolean canAssign(Priority priority, FiCaSchedulerNode node,
NodeType type, RMContainer reservedContainer) {
// Clearly we need containers for this application...
if (type == NodeType.OFF_SWITCH) {
if (reservedContainer != null) {
return true;
}
// 'Delay' off-switch
ResourceRequest offSwitchRequest =
getResourceRequest(priority, ResourceRequest.ANY);
long missedOpportunities = getSchedulingOpportunities(priority);
long requiredContainers = offSwitchRequest.getNumContainers();
float localityWaitFactor =
getLocalityWaitFactor(priority, scheduler.getNumClusterNodes());
return ((requiredContainers * localityWaitFactor) < missedOpportunities);
}
// Check if we need containers on this rack
ResourceRequest rackLocalRequest =
getResourceRequest(priority, node.getRackName());
if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
return false;
}
// If we are here, we do need containers on this rack for RACK_LOCAL req
if (type == NodeType.RACK_LOCAL) {
// 'Delay' rack-local just a little bit...
long missedOpportunities = getSchedulingOpportunities(priority);
return getActualNodeLocalityDelay() < missedOpportunities;
}
// Check if we need containers on this host
if (type == NodeType.NODE_LOCAL) {
// Now check if we need containers on this host...
ResourceRequest nodeLocalRequest =
getResourceRequest(priority, node.getNodeName());
if (nodeLocalRequest != null) {
return nodeLocalRequest.getNumContainers() > 0;
}
}
return false;
}
boolean
shouldAllocOrReserveNewContainer(Priority priority, Resource required) {
int requiredContainers = getTotalRequiredResources(priority);
int reservedContainers = getNumReservedContainers(priority);
int starvation = 0;
if (reservedContainers > 0) {
float nodeFactor =
Resources.ratio(
rc, required, getCSLeafQueue().getMaximumAllocation()
);
// Use percentage of node required to bias against large containers...
// Protect against corner case where you need the whole node with
// Math.min(nodeFactor, minimumAllocationFactor)
starvation =
(int)((getReReservations(priority) / (float)reservedContainers) *
(1.0f - (Math.min(nodeFactor, getCSLeafQueue().getMinimumAllocationFactor())))
);
if (LOG.isDebugEnabled()) {
LOG.debug("needsContainers:" +
" app.#re-reserve=" + getReReservations(priority) +
" reserved=" + reservedContainers +
" nodeFactor=" + nodeFactor +
" minAllocFactor=" + getCSLeafQueue().getMinimumAllocationFactor() +
" starvation=" + starvation);
}
}
return (((starvation + requiredContainers) - reservedContainers) > 0);
}
private CSAssignment assignNodeLocalContainers(Resource clusterResource,
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer,
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
if (canAssign(priority, node, NodeType.NODE_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
allocatedContainer, schedulingMode, currentResoureLimits);
}
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
}
private CSAssignment assignRackLocalContainers(Resource clusterResource,
ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer,
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
if (canAssign(priority, node, NodeType.RACK_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
allocatedContainer, schedulingMode, currentResoureLimits);
}
return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
}
private CSAssignment assignOffSwitchContainers(Resource clusterResource,
ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer,
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
if (canAssign(priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
allocatedContainer, schedulingMode, currentResoureLimits);
}
return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
}
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, Priority priority,
RMContainer reservedContainer, SchedulingMode schedulingMode,
ResourceLimits currentResoureLimits) {
CSAssignment assigned;
NodeType requestType = null;
MutableObject allocatedContainer = new MutableObject();
// Data-local
ResourceRequest nodeLocalResourceRequest =
getResourceRequest(priority, node.getNodeName());
if (nodeLocalResourceRequest != null) {
requestType = NodeType.NODE_LOCAL;
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, priority, reservedContainer,
allocatedContainer, schedulingMode, currentResoureLimits);
if (Resources.greaterThan(rc, clusterResource,
assigned.getResource(), Resources.none())) {
//update locality statistics
if (allocatedContainer.getValue() != null) {
incNumAllocatedContainers(NodeType.NODE_LOCAL,
requestType);
}
assigned.setType(NodeType.NODE_LOCAL);
return assigned;
}
}
// Rack-local
ResourceRequest rackLocalResourceRequest =
getResourceRequest(priority, node.getRackName());
if (rackLocalResourceRequest != null) {
if (!rackLocalResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
if (requestType != NodeType.NODE_LOCAL) {
requestType = NodeType.RACK_LOCAL;
}
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, priority, reservedContainer,
allocatedContainer, schedulingMode, currentResoureLimits);
if (Resources.greaterThan(rc, clusterResource,
assigned.getResource(), Resources.none())) {
//update locality statistics
if (allocatedContainer.getValue() != null) {
incNumAllocatedContainers(NodeType.RACK_LOCAL,
requestType);
}
assigned.setType(NodeType.RACK_LOCAL);
return assigned;
}
}
// Off-switch
ResourceRequest offSwitchResourceRequest =
getResourceRequest(priority, ResourceRequest.ANY);
if (offSwitchResourceRequest != null) {
if (!offSwitchResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
if (requestType != NodeType.NODE_LOCAL
&& requestType != NodeType.RACK_LOCAL) {
requestType = NodeType.OFF_SWITCH;
}
assigned =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, priority, reservedContainer,
allocatedContainer, schedulingMode, currentResoureLimits);
// update locality statistics
if (allocatedContainer.getValue() != null) {
incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
}
assigned.setType(NodeType.OFF_SWITCH);
return assigned;
}
return SKIP_ASSIGNMENT;
}
public void reserve(Priority priority,
FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
// Update reserved metrics if this is the first reservation
@ -618,25 +398,6 @@ public void reserve(Priority priority,
node.reserveResource(this, priority, rmContainer);
}
private Container getContainer(RMContainer rmContainer,
FiCaSchedulerNode node, Resource capability, Priority priority) {
return (rmContainer != null) ? rmContainer.getContainer()
: createContainer(node, capability, priority);
}
Container createContainer(FiCaSchedulerNode node, Resource capability,
Priority priority) {
NodeId nodeId = node.getRMNode().getNodeID();
ContainerId containerId =
BuilderUtils.newContainerId(getApplicationAttemptId(),
getNewContainerId());
// Create the container
return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
.getHttpAddress(), capability, priority, null);
}
@VisibleForTesting
public RMContainer findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, Priority priority,
@ -672,203 +433,63 @@ public RMContainer findNodeToUnreserve(Resource clusterResource,
return nodeToUnreserve.getReservedContainer();
}
private LeafQueue getCSLeafQueue() {
public LeafQueue getCSLeafQueue() {
return (LeafQueue)queue;
}
private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer,
MutableObject createdContainer, SchedulingMode schedulingMode,
ResourceLimits currentResoureLimits) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + getApplicationId()
+ " priority=" + priority.getPriority()
+ " request=" + request + " type=" + type);
}
private CSAssignment getCSAssignmentFromAllocateResult(
Resource clusterResource, ContainerAllocation result) {
// Handle skipped
boolean skipped =
(result.getAllocationState() == AllocationState.APP_SKIPPED);
CSAssignment assignment = new CSAssignment(skipped);
assignment.setApplication(this);
// Handle excess reservation
assignment.setExcessReservation(result.getContainerToBeUnreserved());
// check if the resource request can access the label
if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
node.getPartition(), schedulingMode)) {
// this is a reserved container, but we cannot allocate it now according
// to label not match. This can be caused by node label changed
// We should un-reserve this container.
if (rmContainer != null) {
unreserve(priority, node, rmContainer);
}
return new CSAssignment(Resources.none(), type);
}
// If we allocated something
if (Resources.greaterThan(rc, clusterResource,
result.getResourceToBeAllocated(), Resources.none())) {
Resource allocatedResource = result.getResourceToBeAllocated();
Container updatedContainer = result.getUpdatedContainer();
assignment.setResource(allocatedResource);
assignment.setType(result.getContainerNodeType());
Resource capability = request.getCapability();
Resource available = node.getAvailableResource();
Resource totalResource = node.getTotalResource();
if (!Resources.lessThanOrEqual(rc, clusterResource,
capability, totalResource)) {
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource());
return new CSAssignment(Resources.none(), type);
}
assert Resources.greaterThan(
rc, clusterResource, available, Resources.none());
// Create the container if necessary
Container container =
getContainer(rmContainer, node, capability, priority);
// something went wrong getting/creating the container
if (container == null) {
LOG.warn("Couldn't get container for allocation!");
return new CSAssignment(Resources.none(), type);
}
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
priority, capability);
// Can we allocate a container on this node?
int availableContainers =
rc.computeAvailableContainers(available, capability);
// How much need to unreserve equals to:
// max(required - headroom, amountNeedUnreserve)
Resource resourceNeedToUnReserve =
Resources.max(rc, clusterResource,
Resources.subtract(capability, currentResoureLimits.getHeadroom()),
currentResoureLimits.getAmountNeededUnreserve());
boolean needToUnreserve =
Resources.greaterThan(rc, clusterResource,
resourceNeedToUnReserve, Resources.none());
RMContainer unreservedContainer = null;
boolean reservationsContinueLooking =
getCSLeafQueue().getReservationContinueLooking();
if (availableContainers > 0) {
// Allocate...
// Did we previously reserve containers at this 'priority'?
if (rmContainer != null) {
unreserve(priority, node, rmContainer);
} else if (reservationsContinueLooking && node.getLabels().isEmpty()) {
// when reservationsContinueLooking is set, we may need to unreserve
// some containers to meet this queue, its parents', or the users' resource limits.
// TODO, need change here when we want to support continuous reservation
// looking for labeled partitions.
if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
if (!needToUnreserve) {
// If we shouldn't allocate/reserve new container then we should
// unreserve one the same size we are asking for since the
// currentResoureLimits.getAmountNeededUnreserve could be zero. If
// the limit was hit then use the amount we need to unreserve to be
// under the limit.
resourceNeedToUnReserve = capability;
}
unreservedContainer =
findNodeToUnreserve(clusterResource, node, priority,
resourceNeedToUnReserve);
// When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
// container (That means we *have to* unreserve some resource to
// continue)). If we failed to unreserve some resource, we can't continue.
if (null == unreservedContainer) {
return new CSAssignment(Resources.none(), type);
}
}
}
// Inform the application
RMContainer allocatedContainer =
allocate(type, node, priority, request, container);
// Does the application need this resource?
if (allocatedContainer == null) {
CSAssignment csAssignment = new CSAssignment(Resources.none(), type);
csAssignment.setApplication(this);
csAssignment.setExcessReservation(unreservedContainer);
return csAssignment;
}
// Inform the node
node.allocateContainer(allocatedContainer);
// Inform the ordering policy
getCSLeafQueue().getOrderingPolicy().containerAllocated(this,
allocatedContainer);
LOG.info("assignedContainer" +
" application attempt=" + getApplicationAttemptId() +
" container=" + container +
" queue=" + this +
" clusterResource=" + clusterResource);
createdContainer.setValue(allocatedContainer);
CSAssignment assignment = new CSAssignment(container.getResource(), type);
assignment.getAssignmentInformation().addAllocationDetails(
container.getId(), getCSLeafQueue().getQueuePath());
assignment.getAssignmentInformation().incrAllocations();
assignment.setApplication(this);
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
container.getResource());
assignment.setExcessReservation(unreservedContainer);
return assignment;
} else {
// if we are allowed to allocate but this node doesn't have space, reserve it or
// if this was an already a reserved container, reserve it again
if (shouldAllocOrReserveNewContainer || rmContainer != null) {
if (reservationsContinueLooking && rmContainer == null) {
// we could possibly ignoring queue capacity or user limits when
// reservationsContinueLooking is set. Make sure we didn't need to unreserve
// one.
if (needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("we needed to unreserve to be able to allocate");
}
return new CSAssignment(Resources.none(), type);
}
}
// Reserve by 'charging' in advance...
reserve(priority, node, rmContainer, container);
LOG.info("Reserved container " +
" application=" + getApplicationId() +
" resource=" + request.getCapability() +
" queue=" + this.toString() +
" cluster=" + clusterResource);
CSAssignment assignment =
new CSAssignment(request.getCapability(), type);
if (result.getAllocationState() == AllocationState.RESERVED) {
// This is a reserved container
LOG.info("Reserved container " + " application=" + getApplicationId()
+ " resource=" + allocatedResource + " queue="
+ this.toString() + " cluster=" + clusterResource);
assignment.getAssignmentInformation().addReservationDetails(
container.getId(), getCSLeafQueue().getQueuePath());
updatedContainer.getId(), getCSLeafQueue().getQueuePath());
assignment.getAssignmentInformation().incrReservations();
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
request.getCapability());
return assignment;
allocatedResource);
assignment.setFulfilledReservation(true);
} else {
// This is a new container
// Inform the ordering policy
LOG.info("assignedContainer" + " application attempt="
+ getApplicationAttemptId() + " container="
+ updatedContainer.getId() + " queue=" + this + " clusterResource="
+ clusterResource);
getCSLeafQueue().getOrderingPolicy().containerAllocated(this,
getRMContainer(updatedContainer.getId()));
assignment.getAssignmentInformation().addAllocationDetails(
updatedContainer.getId(), getCSLeafQueue().getQueuePath());
assignment.getAssignmentInformation().incrAllocations();
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
allocatedResource);
}
return new CSAssignment(Resources.none(), type);
}
return assignment;
}
private boolean checkHeadroom(Resource clusterResource,
ResourceLimits currentResourceLimits, Resource required, FiCaSchedulerNode node) {
// If headroom + currentReservation < required, we cannot allocate this
// require
Resource resourceCouldBeUnReserved = getCurrentReservation();
if (!getCSLeafQueue().getReservationContinueLooking() || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
// If we don't allow reservation continuous looking, OR we're looking at
// non-default node partition, we won't allow to unreserve before
// allocation.
resourceCouldBeUnReserved = Resources.none();
}
return Resources
.greaterThanOrEqual(rc, clusterResource, Resources.add(
currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
required);
}
public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode) {
@ -886,174 +507,41 @@ public CSAssignment assignContainers(Resource clusterResource,
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-label=" + node.getPartition());
}
return SKIP_ASSIGNMENT;
return CSAssignment.SKIP_ASSIGNMENT;
}
synchronized (this) {
// Check if this resource is on the blacklist
if (SchedulerAppUtils.isBlacklisted(this, node, LOG)) {
return SKIP_ASSIGNMENT;
}
// Schedule in priority order
for (Priority priority : getPriorities()) {
ResourceRequest anyRequest =
getResourceRequest(priority, ResourceRequest.ANY);
if (null == anyRequest) {
ContainerAllocation allocationResult =
containerAllocator.allocate(clusterResource, node,
schedulingMode, currentResourceLimits, priority, null);
// If it's a skipped allocation
AllocationState allocationState = allocationResult.getAllocationState();
if (allocationState == AllocationState.PRIORITY_SKIPPED) {
continue;
}
// Required resource
Resource required = anyRequest.getCapability();
// Do we need containers at this 'priority'?
if (getTotalRequiredResources(priority) <= 0) {
continue;
}
// AM container allocation doesn't support non-exclusive allocation to
// avoid painful of preempt an AM container
if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
RMAppAttempt rmAppAttempt =
rmContext.getRMApps()
.get(getApplicationId()).getCurrentAppAttempt();
if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
&& null == rmAppAttempt.getMasterContainer()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip allocating AM container to app_attempt="
+ getApplicationAttemptId()
+ ", don't allow to allocate AM container in non-exclusive mode");
}
break;
}
}
// Is the node-label-expression of this offswitch resource request
// matches the node's label?
// If not match, jump to next priority.
if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
anyRequest, node.getPartition(), schedulingMode)) {
continue;
}
if (!getCSLeafQueue().getReservationContinueLooking()) {
if (!shouldAllocOrReserveNewContainer(priority, required)) {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
continue;
}
}
if (!checkHeadroom(clusterResource, currentResourceLimits, required,
node)) {
if (LOG.isDebugEnabled()) {
LOG.debug("cannot allocate required resource=" + required
+ " because of headroom");
}
return NULL_ASSIGNMENT;
}
// Inform the application it is about to get a scheduling opportunity
addSchedulingOpportunity(priority);
// Increase missed-non-partitioned-resource-request-opportunity.
// This is to make sure non-partitioned-resource-request will prefer
// to be allocated to non-partitioned nodes
int missedNonPartitionedRequestSchedulingOpportunity = 0;
if (anyRequest.getNodeLabelExpression().equals(
RMNodeLabelsManager.NO_LABEL)) {
missedNonPartitionedRequestSchedulingOpportunity =
addMissedNonPartitionedRequestSchedulingOpportunity(priority);
}
if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
// Before doing allocation, we need to check scheduling opportunity to
// make sure : non-partitioned resource request should be scheduled to
// non-partitioned partition first.
if (missedNonPartitionedRequestSchedulingOpportunity < rmContext
.getScheduler().getNumClusterNodes()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip app_attempt="
+ getApplicationAttemptId() + " priority="
+ priority
+ " because missed-non-partitioned-resource-request"
+ " opportunity under requred:" + " Now="
+ missedNonPartitionedRequestSchedulingOpportunity
+ " required="
+ rmContext.getScheduler().getNumClusterNodes());
}
return SKIP_ASSIGNMENT;
}
}
// Try to schedule
CSAssignment assignment =
assignContainersOnNode(clusterResource, node,
priority, null, schedulingMode, currentResourceLimits);
// Did the application skip this node?
if (assignment.getSkipped()) {
// Don't count 'skipped nodes' as a scheduling opportunity!
subtractSchedulingOpportunity(priority);
continue;
}
// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
if (Resources.greaterThan(rc, clusterResource,
assigned, Resources.none())) {
// Don't reset scheduling opportunities for offswitch assignments
// otherwise the app will be delayed for each non-local assignment.
// This helps apps with many off-cluster requests schedule faster.
if (assignment.getType() != NodeType.OFF_SWITCH) {
if (LOG.isDebugEnabled()) {
LOG.debug("Resetting scheduling opportunities");
}
resetSchedulingOpportunities(priority);
}
// Non-exclusive scheduling opportunity is different: we need reset
// it every time to make sure non-labeled resource request will be
// most likely allocated on non-labeled nodes first.
resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
// Done
return assignment;
} else {
// Do not assign out of order w.r.t priorities
return SKIP_ASSIGNMENT;
}
return getCSAssignmentFromAllocateResult(clusterResource,
allocationResult);
}
}
return SKIP_ASSIGNMENT;
// We will reach here if we skipped all priorities of the app, so we will
// skip the app.
return CSAssignment.SKIP_ASSIGNMENT;
}
public synchronized CSAssignment assignReservedContainer(
FiCaSchedulerNode node, RMContainer rmContainer,
Resource clusterResource, SchedulingMode schedulingMode) {
// Do we still need this reservation?
Priority priority = rmContainer.getReservedPriority();
if (getTotalRequiredResources(priority) == 0) {
// Release
return new CSAssignment(this, rmContainer);
}
ContainerAllocation result =
containerAllocator.allocate(clusterResource, node,
schedulingMode, new ResourceLimits(Resources.none()),
rmContainer.getReservedPriority(), rmContainer);
// Try to assign if we have sufficient resources
CSAssignment tmp =
assignContainersOnNode(clusterResource, node, priority,
rmContainer, schedulingMode, new ResourceLimits(Resources.none()));
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
CSAssignment ret = new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
if (tmp.getAssignmentInformation().getNumAllocations() > 0) {
ret.setFulfilledReservation(true);
}
return ret;
return getCSAssignmentFromAllocateResult(clusterResource, result);
}
}

View File

@ -1489,7 +1489,7 @@ public void testReservationExchange() throws Exception {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
CSAssignment assignment = a.assignContainers(clusterResource, node_0,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -1498,7 +1498,20 @@ public void testReservationExchange() throws Exception {
assertEquals(0*GB, node_0.getUsedResource().getMemory());
}
private void verifyContainerAllocated(CSAssignment assignment, NodeType nodeType) {
Assert.assertTrue(Resources.greaterThan(resourceCalculator, null,
assignment.getResource(), Resources.none()));
Assert
.assertTrue(assignment.getAssignmentInformation().getNumAllocations() > 0);
Assert.assertEquals(nodeType, assignment.getType());
}
private void verifyNoContainerAllocated(CSAssignment assignment) {
Assert.assertTrue(Resources.equals(assignment.getResource(),
Resources.none()));
Assert
.assertTrue(assignment.getAssignmentInformation().getNumAllocations() == 0);
}
@Test
public void testLocalityScheduling() throws Exception {
@ -1512,11 +1525,11 @@ public void testLocalityScheduling() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext));
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
// Setup some nodes and racks
String host_0 = "127.0.0.1";
String rack_0 = "rack_0";
@ -1561,8 +1574,7 @@ public void testLocalityScheduling() throws Exception {
// Start with off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyNoContainerAllocated(assignment);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
@ -1570,8 +1582,7 @@ public void testLocalityScheduling() throws Exception {
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyNoContainerAllocated(assignment);
assertEquals(2, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
@ -1579,8 +1590,7 @@ public void testLocalityScheduling() throws Exception {
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyNoContainerAllocated(assignment);
assertEquals(3, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
@ -1589,26 +1599,21 @@ public void testLocalityScheduling() throws Exception {
// since missedOpportunities=3 and reqdContainers=3
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset
assertEquals(2, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.OFF_SWITCH, assignment.getType());
// NODE_LOCAL - node_0
assignment = a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(1, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
// NODE_LOCAL - node_1
assignment = a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(0, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
@ -1638,16 +1643,13 @@ public void testLocalityScheduling() throws Exception {
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(2, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Should assign RACK_LOCAL now
assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(1, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.RACK_LOCAL, assignment.getType());
}
@Test
@ -1661,9 +1663,9 @@ public void testApplicationPriorityScheduling() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext));
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
// Setup some nodes and racks
@ -1723,63 +1725,48 @@ public void testApplicationPriorityScheduling() throws Exception {
// Start with off switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2,
CSAssignment assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
verifyNoContainerAllocated(assignment);
assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
assertEquals(2, app_0.getTotalRequiredResources(priority_1));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Another off-switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2,
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
verifyNoContainerAllocated(assignment);
assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
assertEquals(2, app_0.getTotalRequiredResources(priority_1));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Another off-switch, shouldn't allocate OFF_SWITCH P1
a.assignContainers(clusterResource, node_2,
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
assertEquals(3, app_0.getSchedulingOpportunities(priority_1));
assertEquals(1, app_0.getTotalRequiredResources(priority_1));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, DATA_LOCAL for P1
a.assignContainers(clusterResource, node_0,
assignment = a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
assertEquals(0, app_0.getTotalRequiredResources(priority_1));
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0),
eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, OFF_SWITCH for P2
a.assignContainers(clusterResource, node_1,
assignment = a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
assertEquals(0, app_0.getTotalRequiredResources(priority_1));
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_1),
eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority_2));
assertEquals(0, app_0.getTotalRequiredResources(priority_2));
@ -1798,8 +1785,8 @@ public void testSchedulingConstraints() throws Exception {
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext));
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
// Setup some nodes and racks
@ -1849,19 +1836,17 @@ public void testSchedulingConstraints() throws Exception {
app_0.updateResourceRequests(app_0_requests_0);
// NODE_LOCAL - node_0_1
a.assignContainers(clusterResource, node_0_0,
CSAssignment assignment = a.assignContainers(clusterResource, node_0_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(0, app_0.getTotalRequiredResources(priority));
// No allocation on node_1_0 even though it's node/rack local since
// required(ANY) == 0
a.assignContainers(clusterResource, node_1_0,
assignment = a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyNoContainerAllocated(assignment);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
// since #req=0
assertEquals(0, app_0.getTotalRequiredResources(priority));
@ -1875,21 +1860,18 @@ public void testSchedulingConstraints() throws Exception {
// No allocation on node_0_1 even though it's node/rack local since
// required(rack_1) == 0
a.assignContainers(clusterResource, node_0_1,
assignment = a.assignContainers(clusterResource, node_0_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyNoContainerAllocated(assignment);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(1, app_0.getTotalRequiredResources(priority));
// NODE_LOCAL - node_1
a.assignContainers(clusterResource, node_1_0,
assignment = a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(0, app_0.getTotalRequiredResources(priority));
}
@Test (timeout = 30000)
@ -2067,16 +2049,16 @@ public void testLocalityConstraints() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext));
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), spyRMContext));
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes and racks
@ -2136,10 +2118,10 @@ public void testLocalityConstraints() throws Exception {
// node_0_1
// Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
a.assignContainers(clusterResource, node_0_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
CSAssignment assignment =
a.assignContainers(clusterResource, node_0_1, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyNoContainerAllocated(assignment);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// resourceName: <priority, memory, #containers, relaxLocality>
@ -2159,10 +2141,9 @@ public void testLocalityConstraints() throws Exception {
// node_1_1
// Shouldn't allocate since RR(rack_1) = relax: false
a.assignContainers(clusterResource, node_1_1,
assignment = a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyNoContainerAllocated(assignment);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// Allow rack-locality for rack_1, but blacklist node_1_1
@ -2190,10 +2171,9 @@ public void testLocalityConstraints() throws Exception {
// node_1_1
// Shouldn't allocate since node_1_1 is blacklisted
a.assignContainers(clusterResource, node_1_1,
assignment = a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyNoContainerAllocated(assignment);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// Now, remove node_1_1 from blacklist, but add rack_1 to blacklist
@ -2219,10 +2199,9 @@ public void testLocalityConstraints() throws Exception {
// node_1_1
// Shouldn't allocate since rack_1 is blacklisted
a.assignContainers(clusterResource, node_1_1,
assignment = a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyNoContainerAllocated(assignment);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// Now remove rack_1 from blacklist
@ -2246,10 +2225,9 @@ public void testLocalityConstraints() throws Exception {
// Blacklist: < host_0_0 > <----
// Now, should allocate since RR(rack_1) = relax: true
a.assignContainers(clusterResource, node_1_1,
assignment = a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyNoContainerAllocated(assignment);
assertEquals(0, app_0.getSchedulingOpportunities(priority));
assertEquals(1, app_0.getTotalRequiredResources(priority));
@ -2277,10 +2255,9 @@ public void testLocalityConstraints() throws Exception {
// host_1_0: 8G
// host_1_1: 7G
a.assignContainers(clusterResource, node_1_0,
assignment = a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority));
assertEquals(0, app_0.getTotalRequiredResources(priority));