YARN-4026. Refactored ContainerAllocator to accept a list of priorites rather than a single priority. Contributed by Wangda Tan

(cherry picked from commit e5003be907)
This commit is contained in:
Jian He 2015-08-12 15:07:50 -07:00
parent c5b20e0db6
commit 65d22b3686
7 changed files with 315 additions and 207 deletions

View File

@ -341,6 +341,9 @@ Release 2.8.0 - UNRELEASED
YARN-3966. Fix excessive loggings in CapacityScheduler. (Jian He via wangda) YARN-3966. Fix excessive loggings in CapacityScheduler. (Jian He via wangda)
YARN-4026. Refactored ContainerAllocator to accept a list of priorites
rather than a single priority. (Wangda Tan via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -763,8 +763,9 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerApp application = FiCaSchedulerApp application =
getApplication(reservedContainer.getApplicationAttemptId()); getApplication(reservedContainer.getApplicationAttemptId());
synchronized (application) { synchronized (application) {
CSAssignment assignment = application.assignReservedContainer(node, reservedContainer, CSAssignment assignment =
clusterResource, schedulingMode); application.assignContainers(clusterResource, node,
currentResourceLimits, schedulingMode, reservedContainer);
handleExcessReservedContainer(clusterResource, assignment); handleExcessReservedContainer(clusterResource, assignment);
return assignment; return assignment;
} }
@ -812,7 +813,7 @@ public class LeafQueue extends AbstractCSQueue {
// Try to schedule // Try to schedule
CSAssignment assignment = CSAssignment assignment =
application.assignContainers(clusterResource, node, application.assignContainers(clusterResource, node,
currentResourceLimits, schedulingMode); currentResourceLimits, schedulingMode, null);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("post-assignContainers for application " LOG.debug("post-assignContainers for application "

View File

@ -25,18 +25,31 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerAllocation { public class ContainerAllocation {
/**
* Skip the locality (e.g. node-local, rack-local, any), and look at other
* localities of the same priority
*/
public static final ContainerAllocation LOCALITY_SKIPPED =
new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED);
/**
* Skip the priority, and look at other priorities of the same application
*/
public static final ContainerAllocation PRIORITY_SKIPPED = public static final ContainerAllocation PRIORITY_SKIPPED =
new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED); new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED);
/**
* Skip the application, and look at other applications of the same queue
*/
public static final ContainerAllocation APP_SKIPPED = public static final ContainerAllocation APP_SKIPPED =
new ContainerAllocation(null, null, AllocationState.APP_SKIPPED); new ContainerAllocation(null, null, AllocationState.APP_SKIPPED);
/**
* Skip the leaf-queue, and look at other queues of the same parent queue
*/
public static final ContainerAllocation QUEUE_SKIPPED = public static final ContainerAllocation QUEUE_SKIPPED =
new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED); new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED);
public static final ContainerAllocation LOCALITY_SKIPPED =
new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED);
RMContainer containerToBeUnreserved; RMContainer containerToBeUnreserved;
private Resource resourceToBeAllocated = Resources.none(); private Resource resourceToBeAllocated = Resources.none();
AllocationState state; AllocationState state;

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -36,6 +39,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
* extensible. * extensible.
*/ */
public abstract class ContainerAllocator { public abstract class ContainerAllocator {
private static final Log LOG = LogFactory.getLog(ContainerAllocator.class);
FiCaSchedulerApp application; FiCaSchedulerApp application;
final ResourceCalculator rc; final ResourceCalculator rc;
final RMContext rmContext; final RMContext rmContext;
@ -47,26 +52,7 @@ public abstract class ContainerAllocator {
this.rmContext = rmContext; this.rmContext = rmContext;
} }
/** protected boolean checkHeadroom(Resource clusterResource,
* preAllocation is to perform checks, etc. to see if we can/cannot allocate
* container. It will put necessary information to returned
* {@link ContainerAllocation}.
*/
abstract ContainerAllocation preAllocation(
Resource clusterResource, FiCaSchedulerNode node,
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
Priority priority, RMContainer reservedContainer);
/**
* doAllocation is to update application metrics, create containers, etc.
* According to allocating conclusion decided by preAllocation.
*/
abstract ContainerAllocation doAllocation(
ContainerAllocation allocationResult, Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority,
RMContainer reservedContainer);
boolean checkHeadroom(Resource clusterResource,
ResourceLimits currentResourceLimits, Resource required, ResourceLimits currentResourceLimits, Resource required,
FiCaSchedulerNode node) { FiCaSchedulerNode node) {
// If headroom + currentReservation < required, we cannot allocate this // If headroom + currentReservation < required, we cannot allocate this
@ -84,6 +70,68 @@ public abstract class ContainerAllocator {
required); required);
} }
protected CSAssignment getCSAssignmentFromAllocateResult(
Resource clusterResource, ContainerAllocation result,
RMContainer rmContainer) {
// Handle skipped
boolean skipped =
(result.getAllocationState() == AllocationState.APP_SKIPPED);
CSAssignment assignment = new CSAssignment(skipped);
assignment.setApplication(application);
// Handle excess reservation
assignment.setExcessReservation(result.getContainerToBeUnreserved());
// If we allocated something
if (Resources.greaterThan(rc, clusterResource,
result.getResourceToBeAllocated(), Resources.none())) {
Resource allocatedResource = result.getResourceToBeAllocated();
Container updatedContainer = result.getUpdatedContainer();
assignment.setResource(allocatedResource);
assignment.setType(result.getContainerNodeType());
if (result.getAllocationState() == AllocationState.RESERVED) {
// This is a reserved container
LOG.info("Reserved container " + " application="
+ application.getApplicationId() + " resource=" + allocatedResource
+ " queue=" + this.toString() + " cluster=" + clusterResource);
assignment.getAssignmentInformation().addReservationDetails(
updatedContainer.getId(),
application.getCSLeafQueue().getQueuePath());
assignment.getAssignmentInformation().incrReservations();
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
allocatedResource);
} else if (result.getAllocationState() == AllocationState.ALLOCATED){
// This is a new container
// Inform the ordering policy
LOG.info("assignedContainer" + " application attempt="
+ application.getApplicationAttemptId() + " container="
+ updatedContainer.getId() + " queue=" + this + " clusterResource="
+ clusterResource);
application
.getCSLeafQueue()
.getOrderingPolicy()
.containerAllocated(application,
application.getRMContainer(updatedContainer.getId()));
assignment.getAssignmentInformation().addAllocationDetails(
updatedContainer.getId(),
application.getCSLeafQueue().getQueuePath());
assignment.getAssignmentInformation().incrAllocations();
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
allocatedResource);
if (rmContainer != null) {
assignment.setFulfilledReservation(true);
}
}
}
return assignment;
}
/** /**
* allocate needs to handle following stuffs: * allocate needs to handle following stuffs:
* *
@ -96,20 +144,7 @@ public abstract class ContainerAllocator {
* container, this will also update metrics</li> * container, this will also update metrics</li>
* </ul> * </ul>
*/ */
public ContainerAllocation allocate(Resource clusterResource, public abstract CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode, FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, Priority priority, ResourceLimits resourceLimits, RMContainer reservedContainer);
RMContainer reservedContainer) {
ContainerAllocation result =
preAllocation(clusterResource, node, schedulingMode,
resourceLimits, priority, reservedContainer);
if (AllocationState.ALLOCATED == result.state
|| AllocationState.RESERVED == result.state) {
result = doAllocation(result, clusterResource, node,
schedulingMode, priority, reservedContainer);
}
return result;
}
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -154,7 +155,6 @@ public class RegularContainerAllocator extends ContainerAllocator {
return null; return null;
} }
@Override
ContainerAllocation preAllocation(Resource clusterResource, ContainerAllocation preAllocation(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode, FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, Priority priority, ResourceLimits resourceLimits, Priority priority,
@ -295,14 +295,14 @@ public class RegularContainerAllocator extends ContainerAllocator {
schedulingMode, currentResoureLimits); schedulingMode, currentResoureLimits);
} }
return ContainerAllocation.QUEUE_SKIPPED; return ContainerAllocation.APP_SKIPPED;
} }
private ContainerAllocation assignContainersOnNode(Resource clusterResource, private ContainerAllocation assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
ContainerAllocation assigned; ContainerAllocation allocation;
NodeType requestType = null; NodeType requestType = null;
// Data-local // Data-local
@ -310,14 +310,14 @@ public class RegularContainerAllocator extends ContainerAllocator {
application.getResourceRequest(priority, node.getNodeName()); application.getResourceRequest(priority, node.getNodeName());
if (nodeLocalResourceRequest != null) { if (nodeLocalResourceRequest != null) {
requestType = NodeType.NODE_LOCAL; requestType = NodeType.NODE_LOCAL;
assigned = allocation =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, priority, reservedContainer, schedulingMode, node, priority, reservedContainer, schedulingMode,
currentResoureLimits); currentResoureLimits);
if (Resources.greaterThan(rc, clusterResource, if (Resources.greaterThan(rc, clusterResource,
assigned.getResourceToBeAllocated(), Resources.none())) { allocation.getResourceToBeAllocated(), Resources.none())) {
assigned.requestNodeType = requestType; allocation.requestNodeType = requestType;
return assigned; return allocation;
} }
} }
@ -333,14 +333,14 @@ public class RegularContainerAllocator extends ContainerAllocator {
requestType = NodeType.RACK_LOCAL; requestType = NodeType.RACK_LOCAL;
} }
assigned = allocation =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest, assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, priority, reservedContainer, schedulingMode, node, priority, reservedContainer, schedulingMode,
currentResoureLimits); currentResoureLimits);
if (Resources.greaterThan(rc, clusterResource, if (Resources.greaterThan(rc, clusterResource,
assigned.getResourceToBeAllocated(), Resources.none())) { allocation.getResourceToBeAllocated(), Resources.none())) {
assigned.requestNodeType = requestType; allocation.requestNodeType = requestType;
return assigned; return allocation;
} }
} }
@ -356,13 +356,19 @@ public class RegularContainerAllocator extends ContainerAllocator {
requestType = NodeType.OFF_SWITCH; requestType = NodeType.OFF_SWITCH;
} }
assigned = allocation =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, priority, reservedContainer, schedulingMode, node, priority, reservedContainer, schedulingMode,
currentResoureLimits); currentResoureLimits);
assigned.requestNodeType = requestType; allocation.requestNodeType = requestType;
return assigned; // When a returned allocation is LOCALITY_SKIPPED, since we're in
// off-switch request now, we will skip this app w.r.t priorities
if (allocation.state == AllocationState.LOCALITY_SKIPPED) {
allocation.state = AllocationState.APP_SKIPPED;
}
return allocation;
} }
return ContainerAllocation.PRIORITY_SKIPPED; return ContainerAllocation.PRIORITY_SKIPPED;
@ -388,7 +394,7 @@ public class RegularContainerAllocator extends ContainerAllocator {
// to label not match. This can be caused by node label changed // to label not match. This can be caused by node label changed
// We should un-reserve this container. // We should un-reserve this container.
return new ContainerAllocation(rmContainer, null, return new ContainerAllocation(rmContainer, null,
AllocationState.QUEUE_SKIPPED); AllocationState.LOCALITY_SKIPPED);
} }
Resource capability = request.getCapability(); Resource capability = request.getCapability();
@ -400,7 +406,8 @@ public class RegularContainerAllocator extends ContainerAllocator {
LOG.warn("Node : " + node.getNodeID() LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for request : " + request + " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource()); + " node total capability : " + node.getTotalResource());
return ContainerAllocation.QUEUE_SKIPPED; // Skip this locality request
return ContainerAllocation.LOCALITY_SKIPPED;
} }
assert Resources.greaterThan( assert Resources.greaterThan(
@ -457,7 +464,8 @@ public class RegularContainerAllocator extends ContainerAllocator {
// continue)). If we failed to unreserve some resource, we can't // continue)). If we failed to unreserve some resource, we can't
// continue. // continue.
if (null == unreservedContainer) { if (null == unreservedContainer) {
return ContainerAllocation.QUEUE_SKIPPED; // Skip the locality request
return ContainerAllocation.LOCALITY_SKIPPED;
} }
} }
} }
@ -468,19 +476,20 @@ public class RegularContainerAllocator extends ContainerAllocator {
result.containerNodeType = type; result.containerNodeType = type;
return result; return result;
} else { } else {
// if we are allowed to allocate but this node doesn't have space, reserve it or // if we are allowed to allocate but this node doesn't have space, reserve
// if this was an already a reserved container, reserve it again // it or if this was an already a reserved container, reserve it again
if (shouldAllocOrReserveNewContainer || rmContainer != null) { if (shouldAllocOrReserveNewContainer || rmContainer != null) {
if (reservationsContinueLooking && rmContainer == null) { if (reservationsContinueLooking && rmContainer == null) {
// we could possibly ignoring queue capacity or user limits when // we could possibly ignoring queue capacity or user limits when
// reservationsContinueLooking is set. Make sure we didn't need to unreserve // reservationsContinueLooking is set. Make sure we didn't need to
// one. // unreserve one.
if (needToUnreserve) { if (needToUnreserve) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("we needed to unreserve to be able to allocate"); LOG.debug("we needed to unreserve to be able to allocate");
} }
return ContainerAllocation.QUEUE_SKIPPED; // Skip the locality request
return ContainerAllocation.LOCALITY_SKIPPED;
} }
} }
@ -490,7 +499,8 @@ public class RegularContainerAllocator extends ContainerAllocator {
result.containerNodeType = type; result.containerNodeType = type;
return result; return result;
} }
return ContainerAllocation.QUEUE_SKIPPED; // Skip the locality request
return ContainerAllocation.LOCALITY_SKIPPED;
} }
} }
@ -563,8 +573,7 @@ public class RegularContainerAllocator extends ContainerAllocator {
// Skip this app if we failed to allocate. // Skip this app if we failed to allocate.
ContainerAllocation ret = ContainerAllocation ret =
new ContainerAllocation(allocationResult.containerToBeUnreserved, new ContainerAllocation(allocationResult.containerToBeUnreserved,
null, AllocationState.QUEUE_SKIPPED); null, AllocationState.APP_SKIPPED);
ret.state = AllocationState.APP_SKIPPED;
return ret; return ret;
} }
@ -578,7 +587,6 @@ public class RegularContainerAllocator extends ContainerAllocator {
return allocationResult; return allocationResult;
} }
@Override
ContainerAllocation doAllocation(ContainerAllocation allocationResult, ContainerAllocation doAllocation(ContainerAllocation allocationResult,
Resource clusterResource, FiCaSchedulerNode node, Resource clusterResource, FiCaSchedulerNode node,
SchedulingMode schedulingMode, Priority priority, SchedulingMode schedulingMode, Priority priority,
@ -591,7 +599,7 @@ public class RegularContainerAllocator extends ContainerAllocator {
// something went wrong getting/creating the container // something went wrong getting/creating the container
if (container == null) { if (container == null) {
LOG.warn("Couldn't get container for allocation!"); LOG.warn("Couldn't get container for allocation!");
return ContainerAllocation.QUEUE_SKIPPED; return ContainerAllocation.APP_SKIPPED;
} }
if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) { if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) {
@ -626,4 +634,65 @@ public class RegularContainerAllocator extends ContainerAllocator {
return allocationResult; return allocationResult;
} }
private ContainerAllocation allocate(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, Priority priority,
RMContainer reservedContainer) {
ContainerAllocation result =
preAllocation(clusterResource, node, schedulingMode, resourceLimits,
priority, reservedContainer);
if (AllocationState.ALLOCATED == result.state
|| AllocationState.RESERVED == result.state) {
result =
doAllocation(result, clusterResource, node, schedulingMode, priority,
reservedContainer);
}
return result;
}
@Override
public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits,
RMContainer reservedContainer) {
if (reservedContainer == null) {
// Check if application needs more resource, skip if it doesn't need more.
if (!application.hasPendingResourceRequest(rc,
node.getPartition(), clusterResource, schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-label=" + node.getPartition());
}
return CSAssignment.SKIP_ASSIGNMENT;
}
// Schedule in priority order
for (Priority priority : application.getPriorities()) {
ContainerAllocation result =
allocate(clusterResource, node, schedulingMode, resourceLimits,
priority, null);
AllocationState allocationState = result.getAllocationState();
if (allocationState == AllocationState.PRIORITY_SKIPPED) {
continue;
}
return getCSAssignmentFromAllocateResult(clusterResource, result,
null);
}
// We will reach here if we skipped all priorities of the app, so we will
// skip the app.
return CSAssignment.SKIP_ASSIGNMENT;
} else {
ContainerAllocation result =
allocate(clusterResource, node, schedulingMode, resourceLimits,
reservedContainer.getReservedPriority(), reservedContainer);
return getCSAssignmentFromAllocateResult(clusterResource, result,
reservedContainer);
}
}
} }

View File

@ -57,10 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -431,111 +429,18 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return (LeafQueue)queue; return (LeafQueue)queue;
} }
private CSAssignment getCSAssignmentFromAllocateResult(
Resource clusterResource, ContainerAllocation result) {
// Handle skipped
boolean skipped =
(result.getAllocationState() == AllocationState.APP_SKIPPED);
CSAssignment assignment = new CSAssignment(skipped);
assignment.setApplication(this);
// Handle excess reservation
assignment.setExcessReservation(result.getContainerToBeUnreserved());
// If we allocated something
if (Resources.greaterThan(rc, clusterResource,
result.getResourceToBeAllocated(), Resources.none())) {
Resource allocatedResource = result.getResourceToBeAllocated();
Container updatedContainer = result.getUpdatedContainer();
assignment.setResource(allocatedResource);
assignment.setType(result.getContainerNodeType());
if (result.getAllocationState() == AllocationState.RESERVED) {
// This is a reserved container
LOG.info("Reserved container " + " application=" + getApplicationId()
+ " resource=" + allocatedResource + " queue="
+ this.toString() + " cluster=" + clusterResource);
assignment.getAssignmentInformation().addReservationDetails(
updatedContainer.getId(), getCSLeafQueue().getQueuePath());
assignment.getAssignmentInformation().incrReservations();
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
allocatedResource);
assignment.setFulfilledReservation(true);
} else {
// This is a new container
// Inform the ordering policy
LOG.info("assignedContainer" + " application attempt="
+ getApplicationAttemptId() + " container="
+ updatedContainer.getId() + " queue=" + this + " clusterResource="
+ clusterResource);
getCSLeafQueue().getOrderingPolicy().containerAllocated(this,
getRMContainer(updatedContainer.getId()));
assignment.getAssignmentInformation().addAllocationDetails(
updatedContainer.getId(), getCSLeafQueue().getQueuePath());
assignment.getAssignmentInformation().incrAllocations();
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
allocatedResource);
}
}
return assignment;
}
public CSAssignment assignContainers(Resource clusterResource, public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits, FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode) { SchedulingMode schedulingMode, RMContainer reservedContainer) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("pre-assignContainers for application " LOG.debug("pre-assignContainers for application "
+ getApplicationId()); + getApplicationId());
showRequests(); showRequests();
} }
// Check if application needs more resource, skip if it doesn't need more.
if (!hasPendingResourceRequest(rc,
node.getPartition(), clusterResource, schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip app_attempt=" + getApplicationAttemptId()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-label=" + node.getPartition());
}
return CSAssignment.SKIP_ASSIGNMENT;
}
synchronized (this) { synchronized (this) {
// Schedule in priority order return containerAllocator.assignContainers(clusterResource, node,
for (Priority priority : getPriorities()) { schedulingMode, currentResourceLimits, reservedContainer);
ContainerAllocation allocationResult =
containerAllocator.allocate(clusterResource, node,
schedulingMode, currentResourceLimits, priority, null);
// If it's a skipped allocation
AllocationState allocationState = allocationResult.getAllocationState();
if (allocationState == AllocationState.PRIORITY_SKIPPED) {
continue;
}
return getCSAssignmentFromAllocateResult(clusterResource,
allocationResult);
} }
} }
// We will reach here if we skipped all priorities of the app, so we will
// skip the app.
return CSAssignment.SKIP_ASSIGNMENT;
}
public synchronized CSAssignment assignReservedContainer(
FiCaSchedulerNode node, RMContainer rmContainer,
Resource clusterResource, SchedulingMode schedulingMode) {
ContainerAllocation result =
containerAllocator.allocate(clusterResource, node,
schedulingMode, new ResourceLimits(Resources.none()),
rmContainer.getReservedPriority(), rmContainer);
return getCSAssignmentFromAllocateResult(clusterResource, result);
}
} }

View File

@ -622,16 +622,9 @@ public class TestLeafQueue {
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a, new FiCaSchedulerApp(appAttemptId_1, user_1, a,
a.getActiveUsersManager(), spyRMContext); a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_1); // different user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1);
// Setup some nodes // Setup some nodes
String host_0 = "127.0.0.1"; String host_0 = "127.0.0.1";
@ -647,7 +640,7 @@ public class TestLeafQueue {
// Setup resource-requests // Setup resource-requests
Priority priority = TestUtils.createMockPriority(1); Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList( app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true,
priority, recordFactory))); priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList( app_1.updateResourceRequests(Collections.singletonList(
@ -662,39 +655,38 @@ public class TestLeafQueue {
a.setUserLimit(50); a.setUserLimit(50);
a.setUserLimitFactor(2); a.setUserLimitFactor(2);
// Now, only user_0 should be active since he is the only one with // There're two active users
// outstanding requests assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
assertEquals("There should only be 1 active user!",
1, a.getActiveUsersManager().getNumActiveUsers());
// This commented code is key to test 'activeUsers'.
// It should fail the test if uncommented since
// it would increase 'activeUsers' to 2 and stop user_2
// Pre MAPREDUCE-3732 this test should fail without this block too
// app_2.updateResourceRequests(Collections.singletonList(
// TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
// recordFactory)));
// 1 container to user_0 // 1 container to user_0
a.assignContainers(clusterResource, node_0, a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Again one to user_0 since he hasn't exceeded user limit yet // Allocate one container to app_1. Even if app_0
// submit earlier, it cannot get this container assigned since user_0
// exceeded user-limit already.
a.assignContainers(clusterResource, node_0, a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
// One more to user_0 since he is the only active user // Allocate one container to app_0, before allocating this container,
// user-limit = ceil((4 + 1) / 2) = 3G. app_0's used resource (3G) <=
// user-limit.
a.assignContainers(clusterResource, node_1, a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(7*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(6*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
// app_0 doesn't have outstanding resources, there's only one active user.
assertEquals("There should only be 1 active user!",
1, a.getActiveUsersManager().getNumActiveUsers());
} }
@Test @Test
@ -2570,6 +2562,96 @@ public class TestLeafQueue {
} }
@Test
public void testLocalityDelaySkipsApplication() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
// User
String user_0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes and racks
String host_0 = "127.0.0.1";
String rack_0 = "rack_0";
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
String host_1 = "127.0.0.2";
String rack_1 = "rack_1";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
String host_2 = "127.0.0.3";
String rack_2 = "rack_2";
FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
final int numNodes = 3;
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests and submit
// App0 has node local request for host_0/host_1, and app1 has node local
// request for host2.
Priority priority = TestUtils.createMockPriority(1);
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0, 1*GB, 1,
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_0, 1*GB, 1,
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1, 1*GB, 1,
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();
app_1_requests_0.add(
TestUtils.createResourceRequest(host_2, 1*GB, 1,
true, priority, recordFactory));
app_1_requests_0.add(
TestUtils.createResourceRequest(rack_2, 1*GB, 1,
true, priority, recordFactory));
app_1_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // one extra
true, priority, recordFactory));
app_1.updateResourceRequests(app_1_requests_0);
// Start testing...
// When doing allocation, even if app_0 submit earlier than app_1, app_1 can
// still get allocated because app_0 is waiting for node-locality-delay
CSAssignment assignment = null;
// Check app_0's scheduling opportunities increased and app_1 get allocated
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(0, app_0.getLiveContainers().size());
assertEquals(1, app_1.getLiveContainers().size());
}
private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user, private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user,
LeafQueue defaultQueue) { LeafQueue defaultQueue) {
List<FiCaSchedulerApp> appsLists = new ArrayList<FiCaSchedulerApp>(); List<FiCaSchedulerApp> appsLists = new ArrayList<FiCaSchedulerApp>();