YARN-4026. Refactored ContainerAllocator to accept a list of priorites rather than a single priority. Contributed by Wangda Tan
This commit is contained in:
parent
1c12adb71f
commit
e5003be907
|
@ -393,6 +393,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-3966. Fix excessive loggings in CapacityScheduler. (Jian He via wangda)
|
YARN-3966. Fix excessive loggings in CapacityScheduler. (Jian He via wangda)
|
||||||
|
|
||||||
|
YARN-4026. Refactored ContainerAllocator to accept a list of priorites
|
||||||
|
rather than a single priority. (Wangda Tan via jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
|
|
@ -763,8 +763,9 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
FiCaSchedulerApp application =
|
FiCaSchedulerApp application =
|
||||||
getApplication(reservedContainer.getApplicationAttemptId());
|
getApplication(reservedContainer.getApplicationAttemptId());
|
||||||
synchronized (application) {
|
synchronized (application) {
|
||||||
CSAssignment assignment = application.assignReservedContainer(node, reservedContainer,
|
CSAssignment assignment =
|
||||||
clusterResource, schedulingMode);
|
application.assignContainers(clusterResource, node,
|
||||||
|
currentResourceLimits, schedulingMode, reservedContainer);
|
||||||
handleExcessReservedContainer(clusterResource, assignment);
|
handleExcessReservedContainer(clusterResource, assignment);
|
||||||
return assignment;
|
return assignment;
|
||||||
}
|
}
|
||||||
|
@ -812,7 +813,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// Try to schedule
|
// Try to schedule
|
||||||
CSAssignment assignment =
|
CSAssignment assignment =
|
||||||
application.assignContainers(clusterResource, node,
|
application.assignContainers(clusterResource, node,
|
||||||
currentResourceLimits, schedulingMode);
|
currentResourceLimits, schedulingMode, null);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("post-assignContainers for application "
|
LOG.debug("post-assignContainers for application "
|
||||||
|
|
|
@ -25,18 +25,31 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
public class ContainerAllocation {
|
public class ContainerAllocation {
|
||||||
|
/**
|
||||||
|
* Skip the locality (e.g. node-local, rack-local, any), and look at other
|
||||||
|
* localities of the same priority
|
||||||
|
*/
|
||||||
|
public static final ContainerAllocation LOCALITY_SKIPPED =
|
||||||
|
new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Skip the priority, and look at other priorities of the same application
|
||||||
|
*/
|
||||||
public static final ContainerAllocation PRIORITY_SKIPPED =
|
public static final ContainerAllocation PRIORITY_SKIPPED =
|
||||||
new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED);
|
new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Skip the application, and look at other applications of the same queue
|
||||||
|
*/
|
||||||
public static final ContainerAllocation APP_SKIPPED =
|
public static final ContainerAllocation APP_SKIPPED =
|
||||||
new ContainerAllocation(null, null, AllocationState.APP_SKIPPED);
|
new ContainerAllocation(null, null, AllocationState.APP_SKIPPED);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Skip the leaf-queue, and look at other queues of the same parent queue
|
||||||
|
*/
|
||||||
public static final ContainerAllocation QUEUE_SKIPPED =
|
public static final ContainerAllocation QUEUE_SKIPPED =
|
||||||
new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED);
|
new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED);
|
||||||
|
|
||||||
public static final ContainerAllocation LOCALITY_SKIPPED =
|
|
||||||
new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED);
|
|
||||||
|
|
||||||
RMContainer containerToBeUnreserved;
|
RMContainer containerToBeUnreserved;
|
||||||
private Resource resourceToBeAllocated = Resources.none();
|
private Resource resourceToBeAllocated = Resources.none();
|
||||||
AllocationState state;
|
AllocationState state;
|
||||||
|
@ -50,26 +63,26 @@ public class ContainerAllocation {
|
||||||
this.resourceToBeAllocated = resourceToBeAllocated;
|
this.resourceToBeAllocated = resourceToBeAllocated;
|
||||||
this.state = state;
|
this.state = state;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMContainer getContainerToBeUnreserved() {
|
public RMContainer getContainerToBeUnreserved() {
|
||||||
return containerToBeUnreserved;
|
return containerToBeUnreserved;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Resource getResourceToBeAllocated() {
|
public Resource getResourceToBeAllocated() {
|
||||||
if (resourceToBeAllocated == null) {
|
if (resourceToBeAllocated == null) {
|
||||||
return Resources.none();
|
return Resources.none();
|
||||||
}
|
}
|
||||||
return resourceToBeAllocated;
|
return resourceToBeAllocated;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AllocationState getAllocationState() {
|
public AllocationState getAllocationState() {
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
public NodeType getContainerNodeType() {
|
public NodeType getContainerNodeType() {
|
||||||
return containerNodeType;
|
return containerNodeType;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Container getUpdatedContainer() {
|
public Container getUpdatedContainer() {
|
||||||
return updatedContainer;
|
return updatedContainer;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,12 +18,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
|
@ -36,6 +39,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
* extensible.
|
* extensible.
|
||||||
*/
|
*/
|
||||||
public abstract class ContainerAllocator {
|
public abstract class ContainerAllocator {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ContainerAllocator.class);
|
||||||
|
|
||||||
FiCaSchedulerApp application;
|
FiCaSchedulerApp application;
|
||||||
final ResourceCalculator rc;
|
final ResourceCalculator rc;
|
||||||
final RMContext rmContext;
|
final RMContext rmContext;
|
||||||
|
@ -46,27 +51,8 @@ public abstract class ContainerAllocator {
|
||||||
this.rc = rc;
|
this.rc = rc;
|
||||||
this.rmContext = rmContext;
|
this.rmContext = rmContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
protected boolean checkHeadroom(Resource clusterResource,
|
||||||
* preAllocation is to perform checks, etc. to see if we can/cannot allocate
|
|
||||||
* container. It will put necessary information to returned
|
|
||||||
* {@link ContainerAllocation}.
|
|
||||||
*/
|
|
||||||
abstract ContainerAllocation preAllocation(
|
|
||||||
Resource clusterResource, FiCaSchedulerNode node,
|
|
||||||
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
|
|
||||||
Priority priority, RMContainer reservedContainer);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* doAllocation is to update application metrics, create containers, etc.
|
|
||||||
* According to allocating conclusion decided by preAllocation.
|
|
||||||
*/
|
|
||||||
abstract ContainerAllocation doAllocation(
|
|
||||||
ContainerAllocation allocationResult, Resource clusterResource,
|
|
||||||
FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority,
|
|
||||||
RMContainer reservedContainer);
|
|
||||||
|
|
||||||
boolean checkHeadroom(Resource clusterResource,
|
|
||||||
ResourceLimits currentResourceLimits, Resource required,
|
ResourceLimits currentResourceLimits, Resource required,
|
||||||
FiCaSchedulerNode node) {
|
FiCaSchedulerNode node) {
|
||||||
// If headroom + currentReservation < required, we cannot allocate this
|
// If headroom + currentReservation < required, we cannot allocate this
|
||||||
|
@ -83,6 +69,68 @@ public abstract class ContainerAllocator {
|
||||||
currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
|
currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
|
||||||
required);
|
required);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected CSAssignment getCSAssignmentFromAllocateResult(
|
||||||
|
Resource clusterResource, ContainerAllocation result,
|
||||||
|
RMContainer rmContainer) {
|
||||||
|
// Handle skipped
|
||||||
|
boolean skipped =
|
||||||
|
(result.getAllocationState() == AllocationState.APP_SKIPPED);
|
||||||
|
CSAssignment assignment = new CSAssignment(skipped);
|
||||||
|
assignment.setApplication(application);
|
||||||
|
|
||||||
|
// Handle excess reservation
|
||||||
|
assignment.setExcessReservation(result.getContainerToBeUnreserved());
|
||||||
|
|
||||||
|
// If we allocated something
|
||||||
|
if (Resources.greaterThan(rc, clusterResource,
|
||||||
|
result.getResourceToBeAllocated(), Resources.none())) {
|
||||||
|
Resource allocatedResource = result.getResourceToBeAllocated();
|
||||||
|
Container updatedContainer = result.getUpdatedContainer();
|
||||||
|
|
||||||
|
assignment.setResource(allocatedResource);
|
||||||
|
assignment.setType(result.getContainerNodeType());
|
||||||
|
|
||||||
|
if (result.getAllocationState() == AllocationState.RESERVED) {
|
||||||
|
// This is a reserved container
|
||||||
|
LOG.info("Reserved container " + " application="
|
||||||
|
+ application.getApplicationId() + " resource=" + allocatedResource
|
||||||
|
+ " queue=" + this.toString() + " cluster=" + clusterResource);
|
||||||
|
assignment.getAssignmentInformation().addReservationDetails(
|
||||||
|
updatedContainer.getId(),
|
||||||
|
application.getCSLeafQueue().getQueuePath());
|
||||||
|
assignment.getAssignmentInformation().incrReservations();
|
||||||
|
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
|
||||||
|
allocatedResource);
|
||||||
|
} else if (result.getAllocationState() == AllocationState.ALLOCATED){
|
||||||
|
// This is a new container
|
||||||
|
// Inform the ordering policy
|
||||||
|
LOG.info("assignedContainer" + " application attempt="
|
||||||
|
+ application.getApplicationAttemptId() + " container="
|
||||||
|
+ updatedContainer.getId() + " queue=" + this + " clusterResource="
|
||||||
|
+ clusterResource);
|
||||||
|
|
||||||
|
application
|
||||||
|
.getCSLeafQueue()
|
||||||
|
.getOrderingPolicy()
|
||||||
|
.containerAllocated(application,
|
||||||
|
application.getRMContainer(updatedContainer.getId()));
|
||||||
|
|
||||||
|
assignment.getAssignmentInformation().addAllocationDetails(
|
||||||
|
updatedContainer.getId(),
|
||||||
|
application.getCSLeafQueue().getQueuePath());
|
||||||
|
assignment.getAssignmentInformation().incrAllocations();
|
||||||
|
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
|
||||||
|
allocatedResource);
|
||||||
|
|
||||||
|
if (rmContainer != null) {
|
||||||
|
assignment.setFulfilledReservation(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return assignment;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* allocate needs to handle following stuffs:
|
* allocate needs to handle following stuffs:
|
||||||
|
@ -96,20 +144,7 @@ public abstract class ContainerAllocator {
|
||||||
* container, this will also update metrics</li>
|
* container, this will also update metrics</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
*/
|
*/
|
||||||
public ContainerAllocation allocate(Resource clusterResource,
|
public abstract CSAssignment assignContainers(Resource clusterResource,
|
||||||
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
||||||
ResourceLimits resourceLimits, Priority priority,
|
ResourceLimits resourceLimits, RMContainer reservedContainer);
|
||||||
RMContainer reservedContainer) {
|
|
||||||
ContainerAllocation result =
|
|
||||||
preAllocation(clusterResource, node, schedulingMode,
|
|
||||||
resourceLimits, priority, reservedContainer);
|
|
||||||
|
|
||||||
if (AllocationState.ALLOCATED == result.state
|
|
||||||
|| AllocationState.RESERVED == result.state) {
|
|
||||||
result = doAllocation(result, clusterResource, node,
|
|
||||||
schedulingMode, priority, reservedContainer);
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
|
@ -154,7 +155,6 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
ContainerAllocation preAllocation(Resource clusterResource,
|
ContainerAllocation preAllocation(Resource clusterResource,
|
||||||
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
||||||
ResourceLimits resourceLimits, Priority priority,
|
ResourceLimits resourceLimits, Priority priority,
|
||||||
|
@ -295,14 +295,14 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
schedulingMode, currentResoureLimits);
|
schedulingMode, currentResoureLimits);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ContainerAllocation.QUEUE_SKIPPED;
|
return ContainerAllocation.APP_SKIPPED;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ContainerAllocation assignContainersOnNode(Resource clusterResource,
|
private ContainerAllocation assignContainersOnNode(Resource clusterResource,
|
||||||
FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
|
FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
|
||||||
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
|
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
|
||||||
|
|
||||||
ContainerAllocation assigned;
|
ContainerAllocation allocation;
|
||||||
|
|
||||||
NodeType requestType = null;
|
NodeType requestType = null;
|
||||||
// Data-local
|
// Data-local
|
||||||
|
@ -310,14 +310,14 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
application.getResourceRequest(priority, node.getNodeName());
|
application.getResourceRequest(priority, node.getNodeName());
|
||||||
if (nodeLocalResourceRequest != null) {
|
if (nodeLocalResourceRequest != null) {
|
||||||
requestType = NodeType.NODE_LOCAL;
|
requestType = NodeType.NODE_LOCAL;
|
||||||
assigned =
|
allocation =
|
||||||
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
|
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
|
||||||
node, priority, reservedContainer, schedulingMode,
|
node, priority, reservedContainer, schedulingMode,
|
||||||
currentResoureLimits);
|
currentResoureLimits);
|
||||||
if (Resources.greaterThan(rc, clusterResource,
|
if (Resources.greaterThan(rc, clusterResource,
|
||||||
assigned.getResourceToBeAllocated(), Resources.none())) {
|
allocation.getResourceToBeAllocated(), Resources.none())) {
|
||||||
assigned.requestNodeType = requestType;
|
allocation.requestNodeType = requestType;
|
||||||
return assigned;
|
return allocation;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -333,14 +333,14 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
requestType = NodeType.RACK_LOCAL;
|
requestType = NodeType.RACK_LOCAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
assigned =
|
allocation =
|
||||||
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
|
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
|
||||||
node, priority, reservedContainer, schedulingMode,
|
node, priority, reservedContainer, schedulingMode,
|
||||||
currentResoureLimits);
|
currentResoureLimits);
|
||||||
if (Resources.greaterThan(rc, clusterResource,
|
if (Resources.greaterThan(rc, clusterResource,
|
||||||
assigned.getResourceToBeAllocated(), Resources.none())) {
|
allocation.getResourceToBeAllocated(), Resources.none())) {
|
||||||
assigned.requestNodeType = requestType;
|
allocation.requestNodeType = requestType;
|
||||||
return assigned;
|
return allocation;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -356,13 +356,19 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
requestType = NodeType.OFF_SWITCH;
|
requestType = NodeType.OFF_SWITCH;
|
||||||
}
|
}
|
||||||
|
|
||||||
assigned =
|
allocation =
|
||||||
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
|
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
|
||||||
node, priority, reservedContainer, schedulingMode,
|
node, priority, reservedContainer, schedulingMode,
|
||||||
currentResoureLimits);
|
currentResoureLimits);
|
||||||
assigned.requestNodeType = requestType;
|
allocation.requestNodeType = requestType;
|
||||||
|
|
||||||
|
// When a returned allocation is LOCALITY_SKIPPED, since we're in
|
||||||
|
// off-switch request now, we will skip this app w.r.t priorities
|
||||||
|
if (allocation.state == AllocationState.LOCALITY_SKIPPED) {
|
||||||
|
allocation.state = AllocationState.APP_SKIPPED;
|
||||||
|
}
|
||||||
|
|
||||||
return assigned;
|
return allocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
return ContainerAllocation.PRIORITY_SKIPPED;
|
return ContainerAllocation.PRIORITY_SKIPPED;
|
||||||
|
@ -388,7 +394,7 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
// to label not match. This can be caused by node label changed
|
// to label not match. This can be caused by node label changed
|
||||||
// We should un-reserve this container.
|
// We should un-reserve this container.
|
||||||
return new ContainerAllocation(rmContainer, null,
|
return new ContainerAllocation(rmContainer, null,
|
||||||
AllocationState.QUEUE_SKIPPED);
|
AllocationState.LOCALITY_SKIPPED);
|
||||||
}
|
}
|
||||||
|
|
||||||
Resource capability = request.getCapability();
|
Resource capability = request.getCapability();
|
||||||
|
@ -400,7 +406,8 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
LOG.warn("Node : " + node.getNodeID()
|
LOG.warn("Node : " + node.getNodeID()
|
||||||
+ " does not have sufficient resource for request : " + request
|
+ " does not have sufficient resource for request : " + request
|
||||||
+ " node total capability : " + node.getTotalResource());
|
+ " node total capability : " + node.getTotalResource());
|
||||||
return ContainerAllocation.QUEUE_SKIPPED;
|
// Skip this locality request
|
||||||
|
return ContainerAllocation.LOCALITY_SKIPPED;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert Resources.greaterThan(
|
assert Resources.greaterThan(
|
||||||
|
@ -457,7 +464,8 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
// continue)). If we failed to unreserve some resource, we can't
|
// continue)). If we failed to unreserve some resource, we can't
|
||||||
// continue.
|
// continue.
|
||||||
if (null == unreservedContainer) {
|
if (null == unreservedContainer) {
|
||||||
return ContainerAllocation.QUEUE_SKIPPED;
|
// Skip the locality request
|
||||||
|
return ContainerAllocation.LOCALITY_SKIPPED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -468,19 +476,20 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
result.containerNodeType = type;
|
result.containerNodeType = type;
|
||||||
return result;
|
return result;
|
||||||
} else {
|
} else {
|
||||||
// if we are allowed to allocate but this node doesn't have space, reserve it or
|
// if we are allowed to allocate but this node doesn't have space, reserve
|
||||||
// if this was an already a reserved container, reserve it again
|
// it or if this was an already a reserved container, reserve it again
|
||||||
if (shouldAllocOrReserveNewContainer || rmContainer != null) {
|
if (shouldAllocOrReserveNewContainer || rmContainer != null) {
|
||||||
|
|
||||||
if (reservationsContinueLooking && rmContainer == null) {
|
if (reservationsContinueLooking && rmContainer == null) {
|
||||||
// we could possibly ignoring queue capacity or user limits when
|
// we could possibly ignoring queue capacity or user limits when
|
||||||
// reservationsContinueLooking is set. Make sure we didn't need to unreserve
|
// reservationsContinueLooking is set. Make sure we didn't need to
|
||||||
// one.
|
// unreserve one.
|
||||||
if (needToUnreserve) {
|
if (needToUnreserve) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("we needed to unreserve to be able to allocate");
|
LOG.debug("we needed to unreserve to be able to allocate");
|
||||||
}
|
}
|
||||||
return ContainerAllocation.QUEUE_SKIPPED;
|
// Skip the locality request
|
||||||
|
return ContainerAllocation.LOCALITY_SKIPPED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -490,7 +499,8 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
result.containerNodeType = type;
|
result.containerNodeType = type;
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
return ContainerAllocation.QUEUE_SKIPPED;
|
// Skip the locality request
|
||||||
|
return ContainerAllocation.LOCALITY_SKIPPED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -563,8 +573,7 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
// Skip this app if we failed to allocate.
|
// Skip this app if we failed to allocate.
|
||||||
ContainerAllocation ret =
|
ContainerAllocation ret =
|
||||||
new ContainerAllocation(allocationResult.containerToBeUnreserved,
|
new ContainerAllocation(allocationResult.containerToBeUnreserved,
|
||||||
null, AllocationState.QUEUE_SKIPPED);
|
null, AllocationState.APP_SKIPPED);
|
||||||
ret.state = AllocationState.APP_SKIPPED;
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -578,7 +587,6 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
return allocationResult;
|
return allocationResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
ContainerAllocation doAllocation(ContainerAllocation allocationResult,
|
ContainerAllocation doAllocation(ContainerAllocation allocationResult,
|
||||||
Resource clusterResource, FiCaSchedulerNode node,
|
Resource clusterResource, FiCaSchedulerNode node,
|
||||||
SchedulingMode schedulingMode, Priority priority,
|
SchedulingMode schedulingMode, Priority priority,
|
||||||
|
@ -591,7 +599,7 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
// something went wrong getting/creating the container
|
// something went wrong getting/creating the container
|
||||||
if (container == null) {
|
if (container == null) {
|
||||||
LOG.warn("Couldn't get container for allocation!");
|
LOG.warn("Couldn't get container for allocation!");
|
||||||
return ContainerAllocation.QUEUE_SKIPPED;
|
return ContainerAllocation.APP_SKIPPED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) {
|
if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) {
|
||||||
|
@ -626,4 +634,65 @@ public class RegularContainerAllocator extends ContainerAllocator {
|
||||||
|
|
||||||
return allocationResult;
|
return allocationResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ContainerAllocation allocate(Resource clusterResource,
|
||||||
|
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
||||||
|
ResourceLimits resourceLimits, Priority priority,
|
||||||
|
RMContainer reservedContainer) {
|
||||||
|
ContainerAllocation result =
|
||||||
|
preAllocation(clusterResource, node, schedulingMode, resourceLimits,
|
||||||
|
priority, reservedContainer);
|
||||||
|
|
||||||
|
if (AllocationState.ALLOCATED == result.state
|
||||||
|
|| AllocationState.RESERVED == result.state) {
|
||||||
|
result =
|
||||||
|
doAllocation(result, clusterResource, node, schedulingMode, priority,
|
||||||
|
reservedContainer);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CSAssignment assignContainers(Resource clusterResource,
|
||||||
|
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
||||||
|
ResourceLimits resourceLimits,
|
||||||
|
RMContainer reservedContainer) {
|
||||||
|
if (reservedContainer == null) {
|
||||||
|
// Check if application needs more resource, skip if it doesn't need more.
|
||||||
|
if (!application.hasPendingResourceRequest(rc,
|
||||||
|
node.getPartition(), clusterResource, schedulingMode)) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
|
||||||
|
+ ", because it doesn't need more resource, schedulingMode="
|
||||||
|
+ schedulingMode.name() + " node-label=" + node.getPartition());
|
||||||
|
}
|
||||||
|
return CSAssignment.SKIP_ASSIGNMENT;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Schedule in priority order
|
||||||
|
for (Priority priority : application.getPriorities()) {
|
||||||
|
ContainerAllocation result =
|
||||||
|
allocate(clusterResource, node, schedulingMode, resourceLimits,
|
||||||
|
priority, null);
|
||||||
|
|
||||||
|
AllocationState allocationState = result.getAllocationState();
|
||||||
|
if (allocationState == AllocationState.PRIORITY_SKIPPED) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return getCSAssignmentFromAllocateResult(clusterResource, result,
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We will reach here if we skipped all priorities of the app, so we will
|
||||||
|
// skip the app.
|
||||||
|
return CSAssignment.SKIP_ASSIGNMENT;
|
||||||
|
} else {
|
||||||
|
ContainerAllocation result =
|
||||||
|
allocate(clusterResource, node, schedulingMode, resourceLimits,
|
||||||
|
reservedContainer.getReservedPriority(), reservedContainer);
|
||||||
|
return getCSAssignmentFromAllocateResult(clusterResource, result,
|
||||||
|
reservedContainer);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,10 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation;
|
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
@ -280,7 +278,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void addPreemptContainer(ContainerId cont){
|
public synchronized void addPreemptContainer(ContainerId cont) {
|
||||||
// ignore already completed containers
|
// ignore already completed containers
|
||||||
if (liveContainers.containsKey(cont)) {
|
if (liveContainers.containsKey(cont)) {
|
||||||
containersToPreempt.add(cont);
|
containersToPreempt.add(cont);
|
||||||
|
@ -430,112 +428,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
public LeafQueue getCSLeafQueue() {
|
public LeafQueue getCSLeafQueue() {
|
||||||
return (LeafQueue)queue;
|
return (LeafQueue)queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
private CSAssignment getCSAssignmentFromAllocateResult(
|
|
||||||
Resource clusterResource, ContainerAllocation result) {
|
|
||||||
// Handle skipped
|
|
||||||
boolean skipped =
|
|
||||||
(result.getAllocationState() == AllocationState.APP_SKIPPED);
|
|
||||||
CSAssignment assignment = new CSAssignment(skipped);
|
|
||||||
assignment.setApplication(this);
|
|
||||||
|
|
||||||
// Handle excess reservation
|
|
||||||
assignment.setExcessReservation(result.getContainerToBeUnreserved());
|
|
||||||
|
|
||||||
// If we allocated something
|
|
||||||
if (Resources.greaterThan(rc, clusterResource,
|
|
||||||
result.getResourceToBeAllocated(), Resources.none())) {
|
|
||||||
Resource allocatedResource = result.getResourceToBeAllocated();
|
|
||||||
Container updatedContainer = result.getUpdatedContainer();
|
|
||||||
|
|
||||||
assignment.setResource(allocatedResource);
|
|
||||||
assignment.setType(result.getContainerNodeType());
|
|
||||||
|
|
||||||
if (result.getAllocationState() == AllocationState.RESERVED) {
|
|
||||||
// This is a reserved container
|
|
||||||
LOG.info("Reserved container " + " application=" + getApplicationId()
|
|
||||||
+ " resource=" + allocatedResource + " queue="
|
|
||||||
+ this.toString() + " cluster=" + clusterResource);
|
|
||||||
assignment.getAssignmentInformation().addReservationDetails(
|
|
||||||
updatedContainer.getId(), getCSLeafQueue().getQueuePath());
|
|
||||||
assignment.getAssignmentInformation().incrReservations();
|
|
||||||
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
|
|
||||||
allocatedResource);
|
|
||||||
assignment.setFulfilledReservation(true);
|
|
||||||
} else {
|
|
||||||
// This is a new container
|
|
||||||
// Inform the ordering policy
|
|
||||||
LOG.info("assignedContainer" + " application attempt="
|
|
||||||
+ getApplicationAttemptId() + " container="
|
|
||||||
+ updatedContainer.getId() + " queue=" + this + " clusterResource="
|
|
||||||
+ clusterResource);
|
|
||||||
|
|
||||||
getCSLeafQueue().getOrderingPolicy().containerAllocated(this,
|
|
||||||
getRMContainer(updatedContainer.getId()));
|
|
||||||
|
|
||||||
assignment.getAssignmentInformation().addAllocationDetails(
|
|
||||||
updatedContainer.getId(), getCSLeafQueue().getQueuePath());
|
|
||||||
assignment.getAssignmentInformation().incrAllocations();
|
|
||||||
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
|
|
||||||
allocatedResource);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return assignment;
|
|
||||||
}
|
|
||||||
|
|
||||||
public CSAssignment assignContainers(Resource clusterResource,
|
public CSAssignment assignContainers(Resource clusterResource,
|
||||||
FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
|
FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
|
||||||
SchedulingMode schedulingMode) {
|
SchedulingMode schedulingMode, RMContainer reservedContainer) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("pre-assignContainers for application "
|
LOG.debug("pre-assignContainers for application "
|
||||||
+ getApplicationId());
|
+ getApplicationId());
|
||||||
showRequests();
|
showRequests();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if application needs more resource, skip if it doesn't need more.
|
|
||||||
if (!hasPendingResourceRequest(rc,
|
|
||||||
node.getPartition(), clusterResource, schedulingMode)) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Skip app_attempt=" + getApplicationAttemptId()
|
|
||||||
+ ", because it doesn't need more resource, schedulingMode="
|
|
||||||
+ schedulingMode.name() + " node-label=" + node.getPartition());
|
|
||||||
}
|
|
||||||
return CSAssignment.SKIP_ASSIGNMENT;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
// Schedule in priority order
|
return containerAllocator.assignContainers(clusterResource, node,
|
||||||
for (Priority priority : getPriorities()) {
|
schedulingMode, currentResourceLimits, reservedContainer);
|
||||||
ContainerAllocation allocationResult =
|
|
||||||
containerAllocator.allocate(clusterResource, node,
|
|
||||||
schedulingMode, currentResourceLimits, priority, null);
|
|
||||||
|
|
||||||
// If it's a skipped allocation
|
|
||||||
AllocationState allocationState = allocationResult.getAllocationState();
|
|
||||||
|
|
||||||
if (allocationState == AllocationState.PRIORITY_SKIPPED) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
return getCSAssignmentFromAllocateResult(clusterResource,
|
|
||||||
allocationResult);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We will reach here if we skipped all priorities of the app, so we will
|
|
||||||
// skip the app.
|
|
||||||
return CSAssignment.SKIP_ASSIGNMENT;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public synchronized CSAssignment assignReservedContainer(
|
|
||||||
FiCaSchedulerNode node, RMContainer rmContainer,
|
|
||||||
Resource clusterResource, SchedulingMode schedulingMode) {
|
|
||||||
ContainerAllocation result =
|
|
||||||
containerAllocator.allocate(clusterResource, node,
|
|
||||||
schedulingMode, new ResourceLimits(Resources.none()),
|
|
||||||
rmContainer.getReservedPriority(), rmContainer);
|
|
||||||
|
|
||||||
return getCSAssignmentFromAllocateResult(clusterResource, result);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -622,16 +622,9 @@ public class TestLeafQueue {
|
||||||
final ApplicationAttemptId appAttemptId_1 =
|
final ApplicationAttemptId appAttemptId_1 =
|
||||||
TestUtils.getMockApplicationAttemptId(1, 0);
|
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||||
FiCaSchedulerApp app_1 =
|
FiCaSchedulerApp app_1 =
|
||||||
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
|
||||||
a.getActiveUsersManager(), spyRMContext);
|
a.getActiveUsersManager(), spyRMContext);
|
||||||
a.submitApplicationAttempt(app_1, user_0); // same user
|
a.submitApplicationAttempt(app_1, user_1); // different user
|
||||||
|
|
||||||
final ApplicationAttemptId appAttemptId_2 =
|
|
||||||
TestUtils.getMockApplicationAttemptId(2, 0);
|
|
||||||
FiCaSchedulerApp app_2 =
|
|
||||||
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
|
|
||||||
a.getActiveUsersManager(), spyRMContext);
|
|
||||||
a.submitApplicationAttempt(app_2, user_1);
|
|
||||||
|
|
||||||
// Setup some nodes
|
// Setup some nodes
|
||||||
String host_0 = "127.0.0.1";
|
String host_0 = "127.0.0.1";
|
||||||
|
@ -647,7 +640,7 @@ public class TestLeafQueue {
|
||||||
// Setup resource-requests
|
// Setup resource-requests
|
||||||
Priority priority = TestUtils.createMockPriority(1);
|
Priority priority = TestUtils.createMockPriority(1);
|
||||||
app_0.updateResourceRequests(Collections.singletonList(
|
app_0.updateResourceRequests(Collections.singletonList(
|
||||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true,
|
||||||
priority, recordFactory)));
|
priority, recordFactory)));
|
||||||
|
|
||||||
app_1.updateResourceRequests(Collections.singletonList(
|
app_1.updateResourceRequests(Collections.singletonList(
|
||||||
|
@ -662,39 +655,38 @@ public class TestLeafQueue {
|
||||||
a.setUserLimit(50);
|
a.setUserLimit(50);
|
||||||
a.setUserLimitFactor(2);
|
a.setUserLimitFactor(2);
|
||||||
|
|
||||||
// Now, only user_0 should be active since he is the only one with
|
// There're two active users
|
||||||
// outstanding requests
|
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
|
||||||
assertEquals("There should only be 1 active user!",
|
|
||||||
1, a.getActiveUsersManager().getNumActiveUsers());
|
|
||||||
|
|
||||||
// This commented code is key to test 'activeUsers'.
|
|
||||||
// It should fail the test if uncommented since
|
|
||||||
// it would increase 'activeUsers' to 2 and stop user_2
|
|
||||||
// Pre MAPREDUCE-3732 this test should fail without this block too
|
|
||||||
// app_2.updateResourceRequests(Collections.singletonList(
|
|
||||||
// TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
|
|
||||||
// recordFactory)));
|
|
||||||
|
|
||||||
// 1 container to user_0
|
// 1 container to user_0
|
||||||
a.assignContainers(clusterResource, node_0,
|
a.assignContainers(clusterResource, node_0,
|
||||||
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
assertEquals(3*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
|
||||||
// Again one to user_0 since he hasn't exceeded user limit yet
|
// Allocate one container to app_1. Even if app_0
|
||||||
|
// submit earlier, it cannot get this container assigned since user_0
|
||||||
|
// exceeded user-limit already.
|
||||||
a.assignContainers(clusterResource, node_0,
|
a.assignContainers(clusterResource, node_0,
|
||||||
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
assertEquals(3*GB, a.getUsedResources().getMemory());
|
assertEquals(4*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
|
||||||
// One more to user_0 since he is the only active user
|
// Allocate one container to app_0, before allocating this container,
|
||||||
|
// user-limit = ceil((4 + 1) / 2) = 3G. app_0's used resource (3G) <=
|
||||||
|
// user-limit.
|
||||||
a.assignContainers(clusterResource, node_1,
|
a.assignContainers(clusterResource, node_1,
|
||||||
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
assertEquals(4*GB, a.getUsedResources().getMemory());
|
assertEquals(7*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(6*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
|
||||||
|
// app_0 doesn't have outstanding resources, there's only one active user.
|
||||||
|
assertEquals("There should only be 1 active user!",
|
||||||
|
1, a.getActiveUsersManager().getNumActiveUsers());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -2569,6 +2561,96 @@ public class TestLeafQueue {
|
||||||
Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLocalityDelaySkipsApplication() throws Exception {
|
||||||
|
|
||||||
|
// Manipulate queue 'a'
|
||||||
|
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||||
|
|
||||||
|
// User
|
||||||
|
String user_0 = "user_0";
|
||||||
|
|
||||||
|
// Submit applications
|
||||||
|
final ApplicationAttemptId appAttemptId_0 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||||
|
FiCaSchedulerApp app_0 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||||
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
a.submitApplicationAttempt(app_0, user_0);
|
||||||
|
final ApplicationAttemptId appAttemptId_1 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||||
|
FiCaSchedulerApp app_1 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||||
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
a.submitApplicationAttempt(app_1, user_0);
|
||||||
|
|
||||||
|
// Setup some nodes and racks
|
||||||
|
String host_0 = "127.0.0.1";
|
||||||
|
String rack_0 = "rack_0";
|
||||||
|
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
|
||||||
|
|
||||||
|
String host_1 = "127.0.0.2";
|
||||||
|
String rack_1 = "rack_1";
|
||||||
|
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
|
||||||
|
|
||||||
|
String host_2 = "127.0.0.3";
|
||||||
|
String rack_2 = "rack_2";
|
||||||
|
FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
|
||||||
|
|
||||||
|
final int numNodes = 3;
|
||||||
|
Resource clusterResource =
|
||||||
|
Resources.createResource(numNodes * (8*GB), numNodes * 16);
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
|
// Setup resource-requests and submit
|
||||||
|
// App0 has node local request for host_0/host_1, and app1 has node local
|
||||||
|
// request for host2.
|
||||||
|
Priority priority = TestUtils.createMockPriority(1);
|
||||||
|
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
|
||||||
|
app_0_requests_0.add(
|
||||||
|
TestUtils.createResourceRequest(host_0, 1*GB, 1,
|
||||||
|
true, priority, recordFactory));
|
||||||
|
app_0_requests_0.add(
|
||||||
|
TestUtils.createResourceRequest(rack_0, 1*GB, 1,
|
||||||
|
true, priority, recordFactory));
|
||||||
|
app_0_requests_0.add(
|
||||||
|
TestUtils.createResourceRequest(host_1, 1*GB, 1,
|
||||||
|
true, priority, recordFactory));
|
||||||
|
app_0_requests_0.add(
|
||||||
|
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
|
||||||
|
true, priority, recordFactory));
|
||||||
|
app_0_requests_0.add(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra
|
||||||
|
true, priority, recordFactory));
|
||||||
|
app_0.updateResourceRequests(app_0_requests_0);
|
||||||
|
|
||||||
|
List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();
|
||||||
|
app_1_requests_0.add(
|
||||||
|
TestUtils.createResourceRequest(host_2, 1*GB, 1,
|
||||||
|
true, priority, recordFactory));
|
||||||
|
app_1_requests_0.add(
|
||||||
|
TestUtils.createResourceRequest(rack_2, 1*GB, 1,
|
||||||
|
true, priority, recordFactory));
|
||||||
|
app_1_requests_0.add(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // one extra
|
||||||
|
true, priority, recordFactory));
|
||||||
|
app_1.updateResourceRequests(app_1_requests_0);
|
||||||
|
|
||||||
|
// Start testing...
|
||||||
|
// When doing allocation, even if app_0 submit earlier than app_1, app_1 can
|
||||||
|
// still get allocated because app_0 is waiting for node-locality-delay
|
||||||
|
CSAssignment assignment = null;
|
||||||
|
|
||||||
|
// Check app_0's scheduling opportunities increased and app_1 get allocated
|
||||||
|
assignment = a.assignContainers(clusterResource, node_2,
|
||||||
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
|
||||||
|
assertEquals(1, app_0.getSchedulingOpportunities(priority));
|
||||||
|
assertEquals(3, app_0.getTotalRequiredResources(priority));
|
||||||
|
assertEquals(0, app_0.getLiveContainers().size());
|
||||||
|
assertEquals(1, app_1.getLiveContainers().size());
|
||||||
|
}
|
||||||
|
|
||||||
private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user,
|
private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user,
|
||||||
LeafQueue defaultQueue) {
|
LeafQueue defaultQueue) {
|
||||||
|
|
Loading…
Reference in New Issue