From 2977bc6a141041ef7579efc416e93fc55e0c2a1a Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Fri, 6 Jan 2017 09:59:57 -0800 Subject: [PATCH] YARN-6040. Introduce api independent PendingAsk to replace usage of ResourceRequest within Scheduler classes. (Wangda Tan via asuresh) --- .../scheduler/AppSchedulingInfo.java | 171 +++++++------ .../SchedulerApplicationAttempt.java | 48 ++-- .../allocator/AbstractContainerAllocator.java | 4 + .../allocator/RegularContainerAllocator.java | 132 ++++------ .../scheduler/common/PendingAsk.java | 57 +++++ .../common/fica/FiCaSchedulerApp.java | 55 ++-- .../scheduler/fair/FSAppAttempt.java | 160 ++++++------ .../scheduler/fair/FSPreemptionThread.java | 11 +- .../scheduler/fifo/FifoAppAttempt.java | 10 +- .../scheduler/fifo/FifoScheduler.java | 121 ++++----- .../LocalitySchedulingPlacementSet.java | 129 ++++++++-- .../placement/SchedulingPlacementSet.java | 71 +++++- .../scheduler/TestAbstractYarnScheduler.java | 15 +- .../scheduler/TestAppSchedulingInfo.java | 4 +- .../TestSchedulerApplicationAttempt.java | 3 +- .../capacity/TestCapacityScheduler.java | 238 +++++++++--------- .../scheduler/capacity/TestLeafQueue.java | 67 ++--- .../TestNodeLabelContainerAllocation.java | 65 +++-- .../scheduler/capacity/TestReservations.java | 22 +- 19 files changed, 809 insertions(+), 574 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index d901d90bc41..e2ff08287ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -34,16 +34,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -582,16 +584,10 @@ public class AppSchedulingInfo { return schedulerKeys.keySet(); } - @SuppressWarnings("unchecked") - public Map getResourceRequests( - SchedulerRequestKey schedulerKey) { - SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); - if (null != ps) { - return ps.getResourceRequests(); - } - return Collections.emptyMap(); - } - + /** + * Used by REST API to fetch ResourceRequest + * @return All pending ResourceRequests. + */ public List getAllResourceRequests() { List ret = new ArrayList<>(); try { @@ -605,53 +601,51 @@ public class AppSchedulingInfo { return ret; } - public ResourceRequest getResourceRequest(SchedulerRequestKey schedulerKey, + public SchedulingPlacementSet getFirstSchedulingPlacementSet() { + try { + readLock.lock(); + for (SchedulerRequestKey key : schedulerKeys.keySet()) { + SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(key); + if (null != ps) { + return ps; + } + } + return null; + } finally { + readLock.unlock(); + } + + } + + public PendingAsk getNextPendingAsk() { + try { + readLock.lock(); + SchedulerRequestKey firstRequestKey = schedulerKeys.firstKey(); + return getPendingAsk(firstRequestKey, ResourceRequest.ANY); + } finally { + readLock.unlock(); + } + + } + + public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey) { + return getPendingAsk(schedulerKey, ResourceRequest.ANY); + } + + public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey, String resourceName) { try { this.readLock.lock(); - SchedulingPlacementSet ps = - schedulerKeyToPlacementSets.get(schedulerKey); - return (ps == null) ? null : ps.getResourceRequest(resourceName); + SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); + return (ps == null) ? PendingAsk.ZERO : ps.getPendingAsk(resourceName); } finally { this.readLock.unlock(); } } - public Resource getResource(SchedulerRequestKey schedulerKey) { - try { - this.readLock.lock(); - ResourceRequest request = - getResourceRequest(schedulerKey, ResourceRequest.ANY); - return (request == null) ? null : request.getCapability(); - } finally { - this.readLock.unlock(); - } - } - - /** - * Method to return the next resource request to be serviced. - * - * In the initial implementation, we just pick any {@link ResourceRequest} - * corresponding to the highest priority. - * - * @return next {@link ResourceRequest} to allocate resources for. - */ - @Unstable - public synchronized ResourceRequest getNextResourceRequest() { - SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get( - schedulerKeys.firstKey()); - if (null != ps) { - for (ResourceRequest rr : ps.getResourceRequests().values()) { - return rr; - } - } - - return null; - } - /** * Returns if the place (node/rack today) is either blacklisted by the - * application (user) or the system + * application (user) or the system. * * @param resourceName * the resourcename @@ -724,7 +718,6 @@ public class AppSchedulingInfo { public List allocate(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey, - ResourceRequest request, Container containerAllocated) { try { writeLock.lock(); @@ -733,19 +726,13 @@ public class AppSchedulingInfo { updateMetricsForAllocatedContainer(type, containerAllocated); } - return schedulerKeyToPlacementSets.get(schedulerKey) - .allocate(schedulerKey, type, node, request); + return schedulerKeyToPlacementSets.get(schedulerKey).allocate( + schedulerKey, type, node); } finally { writeLock.unlock(); } } - public List allocate(NodeType type, - SchedulerNode node, SchedulerRequestKey schedulerKey, - Container containerAllocated) { - return allocate(type, node, schedulerKey, null, containerAllocated); - } - public void checkForDeactivation() { if (schedulerKeys.isEmpty()) { activeUsersManager.deactivateApplication(user, applicationId); @@ -758,18 +745,20 @@ public class AppSchedulingInfo { QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { - ResourceRequest request = ps.getResourceRequest(ResourceRequest.ANY); - if (request != null && request.getNumContainers() > 0) { - oldMetrics.decrPendingResources(user, request.getNumContainers(), - request.getCapability()); - newMetrics.incrPendingResources(user, request.getNumContainers(), - request.getCapability()); + PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); + if (ask.getCount() > 0) { + oldMetrics.decrPendingResources(user, ask.getCount(), + ask.getPerAllocationResource()); + newMetrics.incrPendingResources(user, ask.getCount(), + ask.getPerAllocationResource()); - Resource delta = Resources.multiply(request.getCapability(), - request.getNumContainers()); + Resource delta = Resources.multiply(ask.getPerAllocationResource(), + ask.getCount()); // Update Queue - queue.decPendingResource(request.getNodeLabelExpression(), delta); - newQueue.incPendingResource(request.getNodeLabelExpression(), delta); + queue.decPendingResource( + ps.getPrimaryRequestedNodePartition(), delta); + newQueue.incPendingResource( + ps.getPrimaryRequestedNodePartition(), delta); } } oldMetrics.moveAppFrom(this); @@ -789,16 +778,16 @@ public class AppSchedulingInfo { this.writeLock.lock(); QueueMetrics metrics = queue.getMetrics(); for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { - ResourceRequest request = ps.getResourceRequest(ResourceRequest.ANY); - if (request != null && request.getNumContainers() > 0) { - metrics.decrPendingResources(user, request.getNumContainers(), - request.getCapability()); + PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); + if (ask.getCount() > 0) { + metrics.decrPendingResources(user, ask.getCount(), + ask.getPerAllocationResource()); // Update Queue queue.decPendingResource( - request.getNodeLabelExpression(), - Resources.multiply(request.getCapability(), - request.getNumContainers())); + ps.getPrimaryRequestedNodePartition(), + Resources.multiply(ask.getPerAllocationResource(), + ask.getCount())); } } metrics.finishAppAttempt(applicationId, pending, user); @@ -906,4 +895,38 @@ public class AppSchedulingInfo { return (SchedulingPlacementSet) schedulerKeyToPlacementSets.get( schedulerkey); } + + /** + * Can delay to next?. + * + * @param schedulerKey schedulerKey + * @param resourceName resourceName + * + * @return If request exists, return {relaxLocality} + * Otherwise, return true. + */ + public boolean canDelayTo( + SchedulerRequestKey schedulerKey, String resourceName) { + try { + this.readLock.lock(); + SchedulingPlacementSet ps = + schedulerKeyToPlacementSets.get(schedulerKey); + return (ps == null) || ps.canDelayTo(resourceName); + } finally { + this.readLock.unlock(); + } + } + + public boolean acceptNodePartition(SchedulerRequestKey schedulerKey, + String nodePartition, SchedulingMode schedulingMode) { + try { + this.readLock.lock(); + SchedulingPlacementSet ps = + schedulerKeyToPlacementSets.get(schedulerKey); + return (ps != null) && ps.acceptNodePartition(nodePartition, + schedulingMode); + } finally { + this.readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 4a8b2da960d..1f6bd1f360f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -78,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Scheduli import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -283,11 +283,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return appSchedulingInfo.getUser(); } - public Map getResourceRequests( - SchedulerRequestKey schedulerKey) { - return appSchedulingInfo.getResourceRequests(schedulerKey); - } - public Set getPendingRelease() { return this.pendingRelease; } @@ -299,34 +294,28 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { public Collection getSchedulerKeys() { return appSchedulingInfo.getSchedulerKeys(); } - - public ResourceRequest getResourceRequest( + + public PendingAsk getPendingAsk( SchedulerRequestKey schedulerKey, String resourceName) { try { readLock.lock(); - return appSchedulingInfo.getResourceRequest(schedulerKey, resourceName); - } finally { - readLock.unlock(); - } - - } - - public int getTotalRequiredResources( - SchedulerRequestKey schedulerKey) { - try { - readLock.lock(); - ResourceRequest request = - getResourceRequest(schedulerKey, ResourceRequest.ANY); - return request == null ? 0 : request.getNumContainers(); + return appSchedulingInfo.getPendingAsk(schedulerKey, resourceName); } finally { readLock.unlock(); } } - public Resource getResource(SchedulerRequestKey schedulerKey) { + public int getOutstandingAsksCount(SchedulerRequestKey schedulerKey) { + return getOutstandingAsksCount(schedulerKey, ResourceRequest.ANY); + } + + public int getOutstandingAsksCount(SchedulerRequestKey schedulerKey, + String resourceName) { try { readLock.lock(); - return appSchedulingInfo.getResource(schedulerKey); + SchedulingPlacementSet ps = appSchedulingInfo.getSchedulingPlacementSet( + schedulerKey); + return ps == null ? 0 : ps.getOutstandingAsksCount(resourceName); } finally { readLock.unlock(); } @@ -625,16 +614,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { try { readLock.lock(); for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) { - Map requests = getResourceRequests( - schedulerKey); - if (requests != null) { + SchedulingPlacementSet ps = getSchedulingPlacementSet(schedulerKey); + if (ps != null && + ps.getOutstandingAsksCount(ResourceRequest.ANY) > 0) { LOG.debug("showRequests:" + " application=" + getApplicationId() + " headRoom=" + getHeadroom() + " currentConsumption=" + attemptResourceUsage.getUsed().getMemorySize()); - for (ResourceRequest request : requests.values()) { - LOG.debug("showRequests:" + " application=" + getApplicationId() - + " request=" + request); - } + ps.showRequests(); } } } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index 5bb91e2a711..a411f170b57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; @@ -46,6 +47,7 @@ public abstract class AbstractContainerAllocator { private static final Log LOG = LogFactory.getLog(AbstractContainerAllocator.class); FiCaSchedulerApp application; + AppSchedulingInfo appInfo; final ResourceCalculator rc; final RMContext rmContext; ActivitiesManager activitiesManager; @@ -59,6 +61,8 @@ public abstract class AbstractContainerAllocator { ResourceCalculator rc, RMContext rmContext, ActivitiesManager activitiesManager) { this.application = application; + this.appInfo = + application == null ? null : application.getAppSchedulingInfo(); this.rc = rc; this.rmContext = rmContext; this.activitiesManager = activitiesManager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index eeb0815b9ec..8078bcd404d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; @@ -54,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -64,8 +64,6 @@ import org.apache.hadoop.yarn.util.resource.Resources; */ public class RegularContainerAllocator extends AbstractContainerAllocator { private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class); - - private ResourceRequest lastResourceRequest = null; public RegularContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext, @@ -103,9 +101,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { Priority priority = schedulerKey.getPriority(); FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); - ResourceRequest anyRequest = - application.getResourceRequest(schedulerKey, ResourceRequest.ANY); - if (null == anyRequest) { + PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey, + ResourceRequest.ANY); + + if (offswitchPendingAsk.getCount() <= 0) { ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, priority, ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST); @@ -113,10 +112,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } // Required resource - Resource required = anyRequest.getCapability(); + Resource required = offswitchPendingAsk.getPerAllocationResource(); // Do we need containers at this 'priority'? - if (application.getTotalRequiredResources(schedulerKey) <= 0) { + if (application.getOutstandingAsksCount(schedulerKey) <= 0) { ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, priority, ActivityDiagnosticConstant.APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE); @@ -141,11 +140,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } } - // Is the node-label-expression of this offswitch resource request - // matches the node's label? + // Is the nodePartition of pending request matches the node's partition // If not match, jump to next priority. - if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( - anyRequest.getNodeLabelExpression(), ps.getPartition(), + if (!appInfo.acceptNodePartition(schedulerKey, node.getPartition(), schedulingMode)) { ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, priority, @@ -182,8 +179,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // This is to make sure non-partitioned-resource-request will prefer // to be allocated to non-partitioned nodes int missedNonPartitionedRequestSchedulingOpportunity = 0; - if (anyRequest.getNodeLabelExpression() - .equals(RMNodeLabelsManager.NO_LABEL)) { + // Only do this when request associated with given scheduler key accepts + // NO_LABEL under RESPECT_EXCLUSIVITY mode + if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL, + appInfo.getSchedulingPlacementSet(schedulerKey) + .getPrimaryRequestedNodePartition())) { missedNonPartitionedRequestSchedulingOpportunity = application.addMissedNonPartitionedRequestSchedulingOpportunity( schedulerKey); @@ -264,8 +264,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { public float getLocalityWaitFactor( SchedulerRequestKey schedulerKey, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) - int requiredResources = - Math.max(application.getResourceRequests(schedulerKey).size() - 1, 0); + int requiredResources = Math.max( + application.getSchedulingPlacementSet(schedulerKey) + .getUniqueLocationAsks() - 1, 0); // waitFactor can't be more than '1' // i.e. no point skipping more than clustersize opportunities @@ -287,11 +288,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } // 'Delay' off-switch - ResourceRequest offSwitchRequest = - application.getResourceRequest(schedulerKey, ResourceRequest.ANY); long missedOpportunities = application.getSchedulingOpportunities(schedulerKey); - long requiredContainers = offSwitchRequest.getNumContainers(); + long requiredContainers = application.getOutstandingAsksCount( + schedulerKey); float localityWaitFactor = getLocalityWaitFactor(schedulerKey, rmContext.getScheduler() @@ -304,9 +304,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } // Check if we need containers on this rack - ResourceRequest rackLocalRequest = - application.getResourceRequest(schedulerKey, node.getRackName()); - if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) { + if (application.getOutstandingAsksCount(schedulerKey, node.getRackName()) + <= 0) { return false; } @@ -321,24 +320,21 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Check if we need containers on this host if (type == NodeType.NODE_LOCAL) { // Now check if we need containers on this host... - ResourceRequest nodeLocalRequest = - application.getResourceRequest(schedulerKey, node.getNodeName()); - if (nodeLocalRequest != null) { - return nodeLocalRequest.getNumContainers() > 0; - } + return application.getOutstandingAsksCount(schedulerKey, + node.getNodeName()) > 0; } return false; } private ContainerAllocation assignNodeLocalContainers( - Resource clusterResource, ResourceRequest nodeLocalResourceRequest, + Resource clusterResource, PendingAsk nodeLocalAsk, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer reservedContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { if (canAssign(schedulerKey, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, schedulerKey, - nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, + nodeLocalAsk, NodeType.NODE_LOCAL, reservedContainer, schedulingMode, currentResoureLimits); } @@ -350,13 +346,13 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } private ContainerAllocation assignRackLocalContainers( - Resource clusterResource, ResourceRequest rackLocalResourceRequest, + Resource clusterResource, PendingAsk rackLocalAsk, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer reservedContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { if (canAssign(schedulerKey, node, NodeType.RACK_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, schedulerKey, - rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, + rackLocalAsk, NodeType.RACK_LOCAL, reservedContainer, schedulingMode, currentResoureLimits); } @@ -368,13 +364,13 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } private ContainerAllocation assignOffSwitchContainers( - Resource clusterResource, ResourceRequest offSwitchResourceRequest, + Resource clusterResource, PendingAsk offSwitchAsk, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, RMContainer reservedContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { if (canAssign(schedulerKey, node, NodeType.OFF_SWITCH, reservedContainer)) { return assignContainer(clusterResource, node, schedulerKey, - offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, + offSwitchAsk, NodeType.OFF_SWITCH, reservedContainer, schedulingMode, currentResoureLimits); } @@ -396,12 +392,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { NodeType requestLocalityType = null; // Data-local - ResourceRequest nodeLocalResourceRequest = - application.getResourceRequest(schedulerKey, node.getNodeName()); - if (nodeLocalResourceRequest != null) { + PendingAsk nodeLocalAsk = + application.getPendingAsk(schedulerKey, node.getNodeName()); + if (nodeLocalAsk.getCount() > 0) { requestLocalityType = NodeType.NODE_LOCAL; allocation = - assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, + assignNodeLocalContainers(clusterResource, nodeLocalAsk, node, schedulerKey, reservedContainer, schedulingMode, currentResoureLimits); if (Resources.greaterThan(rc, clusterResource, @@ -412,10 +408,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } // Rack-local - ResourceRequest rackLocalResourceRequest = - application.getResourceRequest(schedulerKey, node.getRackName()); - if (rackLocalResourceRequest != null) { - if (!rackLocalResourceRequest.getRelaxLocality()) { + PendingAsk rackLocalAsk = + application.getPendingAsk(schedulerKey, node.getRackName()); + if (rackLocalAsk.getCount() > 0) { + if (!appInfo.canDelayTo(schedulerKey, node.getRackName())) { ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, priority, ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY); @@ -427,7 +423,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { requestLocalityType; allocation = - assignRackLocalContainers(clusterResource, rackLocalResourceRequest, + assignRackLocalContainers(clusterResource, rackLocalAsk, node, schedulerKey, reservedContainer, schedulingMode, currentResoureLimits); if (Resources.greaterThan(rc, clusterResource, @@ -438,10 +434,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } // Off-switch - ResourceRequest offSwitchResourceRequest = - application.getResourceRequest(schedulerKey, ResourceRequest.ANY); - if (offSwitchResourceRequest != null) { - if (!offSwitchResourceRequest.getRelaxLocality()) { + PendingAsk offSwitchAsk = + application.getPendingAsk(schedulerKey, ResourceRequest.ANY); + if (offSwitchAsk.getCount() > 0) { + if (!appInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) { ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, priority, ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY); @@ -453,7 +449,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { requestLocalityType; allocation = - assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, + assignOffSwitchContainers(clusterResource, offSwitchAsk, node, schedulerKey, reservedContainer, schedulingMode, currentResoureLimits); @@ -474,41 +470,25 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { private ContainerAllocation assignContainer(Resource clusterResource, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, - ResourceRequest request, NodeType type, RMContainer rmContainer, + PendingAsk pendingAsk, NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { Priority priority = schedulerKey.getPriority(); - lastResourceRequest = request; - + if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId() + " priority=" + schedulerKey.getPriority() - + " request=" + request + " type=" + type); + + " pendingAsk=" + pendingAsk + " type=" + type); } - // check if the resource request can access the label - if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( - request.getNodeLabelExpression(), node.getPartition(), - schedulingMode)) { - // this is a reserved container, but we cannot allocate it now according - // to label not match. This can be caused by node label changed - // We should un-reserve this container. - ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager, - node, application, priority, - ActivityDiagnosticConstant.REQUEST_CAN_NOT_ACCESS_NODE_LABEL, - ActivityState.REJECTED); - return new ContainerAllocation(rmContainer, null, - AllocationState.LOCALITY_SKIPPED); - } - - Resource capability = request.getCapability(); + Resource capability = pendingAsk.getPerAllocationResource(); Resource available = node.getUnallocatedResource(); Resource totalResource = node.getTotalResource(); if (!Resources.lessThanOrEqual(rc, clusterResource, capability, totalResource)) { LOG.warn("Node : " + node.getNodeID() - + " does not have sufficient resource for request : " + request + + " does not have sufficient resource for ask : " + pendingAsk + " node total capability : " + node.getTotalResource()); // Skip this locality request ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( @@ -600,9 +580,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } } - ContainerAllocation result = - new ContainerAllocation(unreservedContainer, request.getCapability(), - AllocationState.ALLOCATED); + ContainerAllocation result = new ContainerAllocation(unreservedContainer, + pendingAsk.getPerAllocationResource(), AllocationState.ALLOCATED); result.containerNodeType = type; result.setToKillContainers(toKillContainers); return result; @@ -626,9 +605,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } } - ContainerAllocation result = - new ContainerAllocation(null, request.getCapability(), - AllocationState.RESERVED); + ContainerAllocation result = new ContainerAllocation(null, + pendingAsk.getPerAllocationResource(), AllocationState.RESERVED); result.containerNodeType = type; result.setToKillContainers(null); return result; @@ -644,7 +622,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { boolean shouldAllocOrReserveNewContainer( SchedulerRequestKey schedulerKey, Resource required) { int requiredContainers = - application.getTotalRequiredResources(schedulerKey); + application.getOutstandingAsksCount(schedulerKey); int reservedContainers = application.getNumReservedContainers(schedulerKey); int starvation = 0; if (reservedContainers > 0) { @@ -699,7 +677,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { SchedulerRequestKey schedulerKey, Container container) { // Inform the application RMContainer allocatedContainer = application.allocate(node, schedulerKey, - lastResourceRequest, container); + container); allocationResult.updatedContainer = allocatedContainer; @@ -803,7 +781,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } } else { // pre-check when allocating reserved container - if (application.getTotalRequiredResources(schedulerKey) == 0) { + if (application.getOutstandingAsksCount(schedulerKey) == 0) { // Release return new ContainerAllocation(reservedContainer, null, AllocationState.QUEUE_SKIPPED); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java new file mode 100644 index 00000000000..85d8715bb2a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * {@link PendingAsk} is the class to include minimal information of how much + * resource to ask under constraints (e.g. on one host / rack / node-attributes) + * , etc. + */ +public class PendingAsk { + private final Resource perAllocationResource; + private final int count; + public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0); + + public PendingAsk(Resource res, int num) { + this.perAllocationResource = res; + this.count = num; + } + + public Resource getPerAllocationResource() { + return perAllocationResource; + } + + public int getCount() { + return count; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(""); + return sb.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 809446f23ff..43293350ebb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -70,6 +70,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.Placeme import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -206,8 +208,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } public RMContainer allocate(FiCaSchedulerNode node, - SchedulerRequestKey schedulerKey, ResourceRequest request, - Container container) { + SchedulerRequestKey schedulerKey, Container container) { try { readLock.lock(); @@ -217,7 +218,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // Required sanity check - AM can call 'allocate' to update resource // request without locking the scheduler, hence we need to check - if (getTotalRequiredResources(schedulerKey) <= 0) { + if (getOutstandingAsksCount(schedulerKey) <= 0) { + return null; + } + + SchedulingPlacementSet ps = + appSchedulingInfo.getSchedulingPlacementSet(schedulerKey); + if (null == ps) { + LOG.warn("Failed to get " + SchedulingPlacementSet.class.getName() + + " for application=" + getApplicationId() + " schedulerRequestKey=" + + schedulerKey); return null; } @@ -225,7 +235,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { RMContainer rmContainer = new RMContainerImpl(container, schedulerKey, this.getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext, - request.getNodeLabelExpression()); + ps.getPrimaryRequestedNodePartition()); ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); // FIXME, should set when confirmed @@ -694,21 +704,36 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return false; } - public synchronized Map getTotalPendingRequestsPerPartition() { + public Map getTotalPendingRequestsPerPartition() { + try { + readLock.lock(); - Map ret = new HashMap(); - Resource res = null; - for (SchedulerRequestKey key : appSchedulingInfo.getSchedulerKeys()) { - ResourceRequest rr = appSchedulingInfo.getResourceRequest(key, "*"); - if ((res = ret.get(rr.getNodeLabelExpression())) == null) { - res = Resources.createResource(0, 0); - ret.put(rr.getNodeLabelExpression(), res); + Map ret = new HashMap<>(); + for (SchedulerRequestKey schedulerKey : appSchedulingInfo + .getSchedulerKeys()) { + SchedulingPlacementSet ps = + appSchedulingInfo.getSchedulingPlacementSet(schedulerKey); + + String nodePartition = ps.getPrimaryRequestedNodePartition(); + Resource res = ret.get(nodePartition); + if (null == res) { + res = Resources.createResource(0); + ret.put(nodePartition, res); + } + + PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); + if (ask.getCount() > 0) { + Resources.addTo(res, Resources + .multiply(ask.getPerAllocationResource(), + ask.getCount())); + } } - Resources.addTo(res, - Resources.multiply(rr.getCapability(), rr.getNumContainers())); + return ret; + } finally { + readLock.unlock(); } - return ret; + } public void markContainerForPreemption(ContainerId cont) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 0daa8a4a7b4..d7ed7d15f21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -18,16 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,11 +46,22 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + /** * Represents an application attempt from the viewpoint of the Fair Scheduler. */ @@ -416,7 +417,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } public RMContainer allocate(NodeType type, FSSchedulerNode node, - SchedulerRequestKey schedulerKey, ResourceRequest request, + SchedulerRequestKey schedulerKey, PendingAsk pendingAsk, Container reservedContainer) { RMContainer rmContainer; Container container; @@ -437,13 +438,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Required sanity check - AM can call 'allocate' to update resource // request without locking the scheduler, hence we need to check - if (getTotalRequiredResources(schedulerKey) <= 0) { + if (getOutstandingAsksCount(schedulerKey) <= 0) { return null; } container = reservedContainer; if (container == null) { - container = createContainer(node, request.getCapability(), + container = createContainer(node, pendingAsk.getPerAllocationResource(), schedulerKey); } @@ -459,7 +460,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Update consumption and track allocations List resourceRequestList = appSchedulingInfo.allocate( - type, node, schedulerKey, request, container); + type, node, schedulerKey, container); this.attemptResourceUsage.incUsed(container.getResource()); // Update resource requests related to "request" and store in RMContainer @@ -632,7 +633,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * in {@link FSSchedulerNode}.. * return whether reservation was possible with the current threshold limits */ - private boolean reserve(ResourceRequest request, FSSchedulerNode node, + private boolean reserve(Resource perAllocationResource, FSSchedulerNode node, Container reservedContainer, NodeType type, SchedulerRequestKey schedulerKey) { @@ -641,7 +642,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt " app_id=" + getApplicationId()); if (reservedContainer == null) { reservedContainer = - createContainer(node, request.getCapability(), + createContainer(node, perAllocationResource, schedulerKey); getMetrics().reserveResource(getUser(), reservedContainer.getResource()); @@ -763,8 +764,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * * @param node * The node to try placing the container on. - * @param request - * The ResourceRequest we're trying to satisfy. + * @param pendingAsk + * The {@link PendingAsk} we're trying to satisfy. * @param type * The locality of the assignment. * @param reserved @@ -776,11 +777,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * made, returns an empty resource. */ private Resource assignContainer( - FSSchedulerNode node, ResourceRequest request, NodeType type, + FSSchedulerNode node, PendingAsk pendingAsk, NodeType type, boolean reserved, SchedulerRequestKey schedulerKey) { // How much does this request need? - Resource capability = request.getCapability(); + Resource capability = pendingAsk.getPerAllocationResource(); // How much does the node have? Resource available = node.getUnallocatedResource(); @@ -794,7 +795,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt if (Resources.fitsIn(capability, available)) { // Inform the application of the new container for this request RMContainer allocatedContainer = - allocate(type, node, schedulerKey, request, + allocate(type, node, schedulerKey, pendingAsk, reservedContainer); if (allocatedContainer == null) { // Did the application need this resource? @@ -825,8 +826,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } // The desired container won't fit here, so reserve - if (isReservable(capability) && - reserve(request, node, reservedContainer, type, schedulerKey)) { + if (isReservable(capability) && reserve( + pendingAsk.getPerAllocationResource(), node, reservedContainer, type, + schedulerKey)) { if (isWaitingForAMContainer()) { updateAMDiagnosticMsg(capability, " exceed the available resources of the node and the request is" @@ -841,7 +843,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } if (LOG.isDebugEnabled()) { LOG.debug("Couldn't creating reservation for " + - getName() + ",at priority " + request.getPriority()); + getName() + ",at priority " + schedulerKey.getPriority()); } return Resources.none(); } @@ -852,19 +854,16 @@ public class FSAppAttempt extends SchedulerApplicationAttempt getQueue().getPolicy().getResourceCalculator(), capacity); } - private boolean hasNodeOrRackLocalRequests(SchedulerRequestKey schedulerKey) { - return getResourceRequests(schedulerKey).size() > 1; - } - /** * Whether the AM container for this app is over maxAMShare limit. */ private boolean isOverAMShareLimit() { // Check the AM resource usage for the leaf queue if (!isAmRunning() && !getUnmanagedAM()) { - List ask = appSchedulingInfo.getAllResourceRequests(); - if (ask.isEmpty() || !getQueue().canRunAppAM( - ask.get(0).getCapability())) { + // Return true if we have not ask, or queue is not be able to run app's AM + PendingAsk ask = appSchedulingInfo.getNextPendingAsk(); + if (ask.getCount() == 0 || !getQueue().canRunAppAM( + ask.getPerAllocationResource())) { return true; } } @@ -886,6 +885,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // (not scheduled) in order to promote better locality. try { writeLock.lock(); + + // TODO (wandga): All logics in this method should be added to + // SchedulerPlacement#canDelayTo which is independent from scheduler. + // Scheduler can choose to use various/pluggable delay-scheduling + // implementation. for (SchedulerRequestKey schedulerKey : keysToTry) { // Skip it for reserved container, since // we already check it in isValidReservation. @@ -895,14 +899,16 @@ public class FSAppAttempt extends SchedulerApplicationAttempt addSchedulingOpportunity(schedulerKey); - ResourceRequest rackLocalRequest = getResourceRequest(schedulerKey, + PendingAsk rackLocalPendingAsk = getPendingAsk(schedulerKey, node.getRackName()); - ResourceRequest localRequest = getResourceRequest(schedulerKey, + PendingAsk nodeLocalPendingAsk = getPendingAsk(schedulerKey, node.getNodeName()); - if (localRequest != null && !localRequest.getRelaxLocality()) { + if (nodeLocalPendingAsk.getCount() > 0 + && !appSchedulingInfo.canDelayTo(schedulerKey, + node.getNodeName())) { LOG.warn("Relax locality off is not supported on local request: " - + localRequest); + + nodeLocalPendingAsk); } NodeType allowedLocality; @@ -918,23 +924,23 @@ public class FSAppAttempt extends SchedulerApplicationAttempt scheduler.getRackLocalityThreshold()); } - if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 - && localRequest != null && localRequest.getNumContainers() != 0) { + if (rackLocalPendingAsk.getCount() > 0 + && nodeLocalPendingAsk.getCount() > 0) { if (LOG.isTraceEnabled()) { LOG.trace("Assign container on " + node.getNodeName() + " node, assignType: NODE_LOCAL" + ", allowedLocality: " + allowedLocality + ", priority: " + schedulerKey.getPriority() + ", app attempt id: " + this.attemptId); } - return assignContainer(node, localRequest, NodeType.NODE_LOCAL, + return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL, reserved, schedulerKey); } - if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) { + if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) { continue; } - if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 + if (rackLocalPendingAsk.getCount() > 0 && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality .equals(NodeType.OFF_SWITCH))) { if (LOG.isTraceEnabled()) { @@ -943,27 +949,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt + allowedLocality + ", priority: " + schedulerKey.getPriority() + ", app attempt id: " + this.attemptId); } - return assignContainer(node, rackLocalRequest, NodeType.RACK_LOCAL, + return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL, reserved, schedulerKey); } - ResourceRequest offSwitchRequest = getResourceRequest(schedulerKey, + PendingAsk offswitchAsk = getPendingAsk(schedulerKey, ResourceRequest.ANY); - if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) { + if (!appSchedulingInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) { continue; } - if (offSwitchRequest != null - && offSwitchRequest.getNumContainers() != 0) { - if (!hasNodeOrRackLocalRequests(schedulerKey) || allowedLocality - .equals(NodeType.OFF_SWITCH)) { + if (offswitchAsk.getCount() > 0) { + if (getSchedulingPlacementSet(schedulerKey).getUniqueLocationAsks() + <= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) { if (LOG.isTraceEnabled()) { LOG.trace("Assign container on " + node.getNodeName() + " node, assignType: OFF_SWITCH" + ", allowedLocality: " + allowedLocality + ", priority: " + schedulerKey.getPriority() + ", app attempt id: " + this.attemptId); } - return assignContainer(node, offSwitchRequest, NodeType.OFF_SWITCH, + return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH, reserved, schedulerKey); } } @@ -988,29 +993,35 @@ public class FSAppAttempt extends SchedulerApplicationAttempt */ private boolean hasContainerForNode(SchedulerRequestKey key, FSSchedulerNode node) { - ResourceRequest anyRequest = getResourceRequest(key, ResourceRequest.ANY); - ResourceRequest rackRequest = getResourceRequest(key, node.getRackName()); - ResourceRequest nodeRequest = getResourceRequest(key, node.getNodeName()); + PendingAsk offswitchAsk = getPendingAsk(key, ResourceRequest.ANY); + Resource resource = offswitchAsk.getPerAllocationResource(); + boolean hasRequestForOffswitch = + offswitchAsk.getCount() > 0; + boolean hasRequestForRack = getOutstandingAsksCount(key, + node.getRackName()) > 0; + boolean hasRequestForNode = getOutstandingAsksCount(key, + node.getNodeName()) > 0; boolean ret = true; if (!(// There must be outstanding requests at the given priority: - anyRequest != null && anyRequest.getNumContainers() > 0 && - // If locality relaxation is turned off at *-level, there must be a - // non-zero request for the node's rack: - (anyRequest.getRelaxLocality() || - (rackRequest != null && rackRequest.getNumContainers() > 0)) && - // If locality relaxation is turned off at rack-level, there must be a - // non-zero request at the node: - (rackRequest == null || rackRequest.getRelaxLocality() || - (nodeRequest != null && nodeRequest.getNumContainers() > 0)) && - // The requested container must be able to fit on the node: - Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, - anyRequest.getCapability(), node.getRMNode().getTotalCapability()))) { + hasRequestForOffswitch && + // If locality relaxation is turned off at *-level, there must be a + // non-zero request for the node's rack: + (appSchedulingInfo.canDelayTo(key, ResourceRequest.ANY) || + (hasRequestForRack)) && + // If locality relaxation is turned off at rack-level, + // there must be a non-zero request at the node: + (!hasRequestForRack || appSchedulingInfo.canDelayTo(key, + node.getRackName()) || (hasRequestForNode)) && + // The requested container must be able to fit on the node: + Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, + resource, + node.getRMNode().getTotalCapability()))) { ret = false; - } else if (!getQueue().fitsInMaxShare(anyRequest.getCapability())) { + } else if (!getQueue().fitsInMaxShare(resource)) { // The requested container must fit in queue maximum share if (isWaitingForAMContainer()) { - updateAMDiagnosticMsg(anyRequest.getCapability(), + updateAMDiagnosticMsg(resource, " exceeds current queue or its parents maximum resource allowed)."); } ret = false; @@ -1091,10 +1102,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return this.fairshareStarvation; } - ResourceRequest getNextResourceRequest() { - return appSchedulingInfo.getNextResourceRequest(); - } - /** * Helper method that captures if this app is identified to be starved. * @return true if the app is starved for fairshare, false otherwise @@ -1174,10 +1181,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt try { writeLock.lock(); for (SchedulerRequestKey k : getSchedulerKeys()) { - ResourceRequest r = getResourceRequest(k, ResourceRequest.ANY); - if (r != null) { - Resources.multiplyAndAddTo(demand, r.getCapability(), - r.getNumContainers()); + PendingAsk pendingAsk = getPendingAsk(k, ResourceRequest.ANY); + if (pendingAsk.getCount() > 0) { + Resources.multiplyAndAddTo(demand, + pendingAsk.getPerAllocationResource(), + pendingAsk.getCount()); } } } finally { @@ -1189,9 +1197,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt public Resource assignContainer(FSSchedulerNode node) { if (isOverAMShareLimit()) { if (isWaitingForAMContainer()) { - List ask = appSchedulingInfo.getAllResourceRequests(); - updateAMDiagnosticMsg(ask.get(0).getCapability(), " exceeds maximum " - + "AM resource allowed)."); + PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk(); + updateAMDiagnosticMsg(amAsk.getPerAllocationResource(), + " exceeds maximum AM resource allowed)."); } if (LOG.isDebugEnabled()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 357985795b2..f4324845ca2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; @@ -90,14 +92,17 @@ class FSPreemptionThread extends Thread { List containers = new ArrayList<>(); // return value // Find the nodes that match the next resource request - ResourceRequest request = starvedApp.getNextResourceRequest(); + SchedulingPlacementSet nextPs = + starvedApp.getAppSchedulingInfo().getFirstSchedulingPlacementSet(); + PendingAsk firstPendingAsk = nextPs.getPendingAsk(ResourceRequest.ANY); // TODO (KK): Should we check other resource requests if we can't match // the first one? - Resource requestCapability = request.getCapability(); + Resource requestCapability = firstPendingAsk.getPerAllocationResource(); + List potentialNodes = scheduler.getNodeTracker().getNodesByResourceName( - request.getResourceName()); + nextPs.getAcceptedResouceNames().next().toString()); // From the potential nodes, pick a node that has enough containers // from apps over their fairshare diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java index fa617103cc1..d932e0e0898 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java @@ -51,8 +51,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp { } public RMContainer allocate(NodeType type, FiCaSchedulerNode node, - SchedulerRequestKey schedulerKey, ResourceRequest request, - Container container) { + SchedulerRequestKey schedulerKey, Container container) { try { writeLock.lock(); @@ -62,15 +61,14 @@ public class FifoAppAttempt extends FiCaSchedulerApp { // Required sanity check - AM can call 'allocate' to update resource // request without locking the scheduler, hence we need to check - if (getTotalRequiredResources(schedulerKey) <= 0) { + if (getOutstandingAsksCount(schedulerKey) <= 0) { return null; } // Create RMContainer RMContainer rmContainer = new RMContainerImpl(container, schedulerKey, this.getApplicationAttemptId(), node.getNodeID(), - appSchedulingInfo.getUser(), this.rmContext, - request.getNodeLabelExpression()); + appSchedulingInfo.getUser(), this.rmContext, node.getPartition()); ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); updateAMContainerDiagnostics(AMState.ASSIGNED, null); @@ -83,7 +81,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp { // Update consumption and track allocations List resourceRequestList = appSchedulingInfo.allocate( - type, node, schedulerKey, request, container); + type, node, schedulerKey, container); attemptResourceUsage.incUsed(node.getPartition(), container.getResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index f4ab9c8aeca..d52f5386ac6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -18,16 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentSkipListMap; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -90,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; @@ -97,7 +89,15 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; @LimitedPrivate("yarn") @Evolving @@ -545,35 +545,32 @@ public class FifoScheduler extends private int getMaxAllocatableContainers(FifoAppAttempt application, SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, NodeType type) { - int maxContainers = 0; - - ResourceRequest offSwitchRequest = - application.getResourceRequest(schedulerKey, ResourceRequest.ANY); - if (offSwitchRequest != null) { - maxContainers = offSwitchRequest.getNumContainers(); - } + PendingAsk offswitchAsk = application.getPendingAsk(schedulerKey, + ResourceRequest.ANY); + int maxContainers = offswitchAsk.getCount(); if (type == NodeType.OFF_SWITCH) { return maxContainers; } if (type == NodeType.RACK_LOCAL) { - ResourceRequest rackLocalRequest = - application.getResourceRequest(schedulerKey, node.getRMNode() - .getRackName()); - if (rackLocalRequest == null) { + PendingAsk rackLocalAsk = application.getPendingAsk(schedulerKey, + node.getRackName()); + if (rackLocalAsk.getCount() <= 0) { return maxContainers; } - maxContainers = Math.min(maxContainers, rackLocalRequest.getNumContainers()); + maxContainers = Math.min(maxContainers, + rackLocalAsk.getCount()); } if (type == NodeType.NODE_LOCAL) { - ResourceRequest nodeLocalRequest = - application.getResourceRequest(schedulerKey, node.getRMNode() - .getNodeAddress()); - if (nodeLocalRequest != null) { - maxContainers = Math.min(maxContainers, nodeLocalRequest.getNumContainers()); + PendingAsk nodeLocalAsk = application.getPendingAsk(schedulerKey, + node.getRMNode().getHostName()); + + if (nodeLocalAsk.getCount() > 0) { + maxContainers = Math.min(maxContainers, + nodeLocalAsk.getCount()); } } @@ -611,25 +608,21 @@ public class FifoScheduler extends private int assignNodeLocalContainers(FiCaSchedulerNode node, FifoAppAttempt application, SchedulerRequestKey schedulerKey) { int assignedContainers = 0; - ResourceRequest request = - application.getResourceRequest(schedulerKey, node.getNodeName()); - if (request != null) { + PendingAsk nodeLocalAsk = application.getPendingAsk(schedulerKey, + node.getNodeName()); + if (nodeLocalAsk.getCount() > 0) { // Don't allocate on this node if we don't need containers on this rack - ResourceRequest rackRequest = - application.getResourceRequest(schedulerKey, - node.getRMNode().getRackName()); - if (rackRequest == null || rackRequest.getNumContainers() <= 0) { + if (application.getOutstandingAsksCount(schedulerKey, + node.getRackName()) <= 0) { return 0; } - - int assignableContainers = - Math.min( - getMaxAllocatableContainers(application, schedulerKey, node, - NodeType.NODE_LOCAL), - request.getNumContainers()); + + int assignableContainers = Math.min( + getMaxAllocatableContainers(application, schedulerKey, node, + NodeType.NODE_LOCAL), nodeLocalAsk.getCount()); assignedContainers = - assignContainer(node, application, schedulerKey, - assignableContainers, request, NodeType.NODE_LOCAL); + assignContainer(node, application, schedulerKey, assignableContainers, + nodeLocalAsk.getPerAllocationResource(), NodeType.NODE_LOCAL); } return assignedContainers; } @@ -637,25 +630,21 @@ public class FifoScheduler extends private int assignRackLocalContainers(FiCaSchedulerNode node, FifoAppAttempt application, SchedulerRequestKey schedulerKey) { int assignedContainers = 0; - ResourceRequest request = - application.getResourceRequest(schedulerKey, node.getRMNode() - .getRackName()); - if (request != null) { + PendingAsk rackAsk = application.getPendingAsk(schedulerKey, + node.getRMNode().getRackName()); + if (rackAsk.getCount() > 0) { // Don't allocate on this rack if the application doens't need containers - ResourceRequest offSwitchRequest = - application.getResourceRequest(schedulerKey, ResourceRequest.ANY); - if (offSwitchRequest.getNumContainers() <= 0) { + if (application.getOutstandingAsksCount(schedulerKey, + ResourceRequest.ANY) <= 0) { return 0; } - - int assignableContainers = - Math.min( - getMaxAllocatableContainers(application, schedulerKey, node, - NodeType.RACK_LOCAL), - request.getNumContainers()); + + int assignableContainers = + Math.min(getMaxAllocatableContainers(application, schedulerKey, node, + NodeType.RACK_LOCAL), rackAsk.getCount()); assignedContainers = - assignContainer(node, application, schedulerKey, - assignableContainers, request, NodeType.RACK_LOCAL); + assignContainer(node, application, schedulerKey, assignableContainers, + rackAsk.getPerAllocationResource(), NodeType.RACK_LOCAL); } return assignedContainers; } @@ -663,26 +652,26 @@ public class FifoScheduler extends private int assignOffSwitchContainers(FiCaSchedulerNode node, FifoAppAttempt application, SchedulerRequestKey schedulerKey) { int assignedContainers = 0; - ResourceRequest request = - application.getResourceRequest(schedulerKey, ResourceRequest.ANY); - if (request != null) { + PendingAsk offswitchAsk = application.getPendingAsk(schedulerKey, + ResourceRequest.ANY); + if (offswitchAsk.getCount() > 0) { assignedContainers = assignContainer(node, application, schedulerKey, - request.getNumContainers(), request, NodeType.OFF_SWITCH); + offswitchAsk.getCount(), + offswitchAsk.getPerAllocationResource(), NodeType.OFF_SWITCH); } return assignedContainers; } private int assignContainer(FiCaSchedulerNode node, FifoAppAttempt application, SchedulerRequestKey schedulerKey, int assignableContainers, - ResourceRequest request, NodeType type) { + Resource capability, NodeType type) { LOG.debug("assignContainers:" + " node=" + node.getRMNode().getNodeAddress() + " application=" + application.getApplicationId().getId() + " priority=" + schedulerKey.getPriority().getPriority() + " assignableContainers=" + assignableContainers + - " request=" + request + " type=" + type); - Resource capability = request.getCapability(); + " capability=" + capability + " type=" + type); // TODO: A buggy application with this zero would crash the scheduler. int availableContainers = @@ -708,7 +697,7 @@ public class FifoScheduler extends // Inform the application RMContainer rmContainer = application.allocate(type, node, schedulerKey, - request, container); + container); // Inform the node node.allocateContainer(rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java index 157518ed1a9..c32246dd283 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java @@ -19,12 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import org.apache.commons.collections.IteratorUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import java.util.ArrayList; @@ -37,9 +41,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class LocalitySchedulingPlacementSet implements SchedulingPlacementSet { + private static final Log LOG = + LogFactory.getLog(LocalitySchedulingPlacementSet.class); + private final Map resourceRequestMap = new ConcurrentHashMap<>(); private AppSchedulingInfo appSchedulingInfo; + private volatile String primaryRequestedPartition = + RMNodeLabelsManager.NO_LABEL; private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; @@ -132,11 +141,14 @@ public class LocalitySchedulingPlacementSet resourceRequestMap.put(resourceName, request); if (resourceName.equals(ResourceRequest.ANY)) { + String partition = request.getNodeLabelExpression() == null ? + RMNodeLabelsManager.NO_LABEL : + request.getNodeLabelExpression(); + + this.primaryRequestedPartition = partition; + //update the applications requested labels set - appSchedulingInfo.addRequestedPartition( - request.getNodeLabelExpression() == null ? - RMNodeLabelsManager.NO_LABEL : - request.getNodeLabelExpression()); + appSchedulingInfo.addRequestedPartition(partition); updateResult = new ResourceRequestUpdateResult(lastRequest, request); } @@ -152,11 +164,43 @@ public class LocalitySchedulingPlacementSet return resourceRequestMap; } - @Override - public ResourceRequest getResourceRequest(String resourceName) { + private ResourceRequest getResourceRequest(String resourceName) { return resourceRequestMap.get(resourceName); } + @Override + public PendingAsk getPendingAsk(String resourceName) { + try { + readLock.lock(); + ResourceRequest request = getResourceRequest(resourceName); + if (null == request) { + return PendingAsk.ZERO; + } else{ + return new PendingAsk(request.getCapability(), + request.getNumContainers()); + } + } finally { + readLock.unlock(); + } + + } + + @Override + public int getOutstandingAsksCount(String resourceName) { + try { + readLock.lock(); + ResourceRequest request = getResourceRequest(resourceName); + if (null == request) { + return 0; + } else{ + return request.getNumContainers(); + } + } finally { + readLock.unlock(); + } + + } + private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey, ResourceRequest offSwitchRequest) { int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; @@ -281,22 +325,67 @@ public class LocalitySchedulingPlacementSet } } + @Override + public boolean canDelayTo(String resourceName) { + try { + readLock.lock(); + ResourceRequest request = getResourceRequest(resourceName); + return request == null || request.getRelaxLocality(); + } finally { + readLock.unlock(); + } + + } + + @Override + public boolean acceptNodePartition(String nodePartition, + SchedulingMode schedulingMode) { + // We will only look at node label = nodeLabelToLookAt according to + // schedulingMode and partition of node. + String nodePartitionToLookAt; + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + nodePartitionToLookAt = nodePartition; + } else { + nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL; + } + + return primaryRequestedPartition.equals(nodePartitionToLookAt); + } + + @Override + public String getPrimaryRequestedNodePartition() { + return primaryRequestedPartition; + } + + @Override + public int getUniqueLocationAsks() { + return resourceRequestMap.size(); + } + + @Override + public void showRequests() { + for (ResourceRequest request : resourceRequestMap.values()) { + if (request.getNumContainers() > 0) { + LOG.debug("\tRequest=" + request); + } + } + } + @Override public List allocate(SchedulerRequestKey schedulerKey, - NodeType type, SchedulerNode node, ResourceRequest request) { + NodeType type, SchedulerNode node) { try { writeLock.lock(); List resourceRequests = new ArrayList<>(); - if (null == request) { - if (type == NodeType.NODE_LOCAL) { - request = resourceRequestMap.get(node.getNodeName()); - } else if (type == NodeType.RACK_LOCAL) { - request = resourceRequestMap.get(node.getRackName()); - } else{ - request = resourceRequestMap.get(ResourceRequest.ANY); - } + ResourceRequest request; + if (type == NodeType.NODE_LOCAL) { + request = resourceRequestMap.get(node.getNodeName()); + } else if (type == NodeType.RACK_LOCAL) { + request = resourceRequestMap.get(node.getRackName()); + } else{ + request = resourceRequestMap.get(ResourceRequest.ANY); } if (type == NodeType.NODE_LOCAL) { @@ -312,4 +401,14 @@ public class LocalitySchedulingPlacementSet writeLock.unlock(); } } + + @Override + public Iterator getAcceptedResouceNames() { + try { + readLock.lock(); + return resourceRequestMap.keySet().iterator(); + } finally { + readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java index 3cf5fa2b5dd..3e0620e5de7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import java.util.Collection; @@ -70,22 +72,38 @@ public interface SchedulingPlacementSet { Map getResourceRequests(); /** - * Get ResourceRequest by given schedulerKey and resourceName + * Get pending ask for given resourceName. If there's no such pendingAsk, + * returns {@link PendingAsk#ZERO} + * * @param resourceName resourceName - * @return ResourceRequest + * @return PendingAsk */ - ResourceRequest getResourceRequest(String resourceName); + PendingAsk getPendingAsk(String resourceName); + + /** + * Get #pending-allocations for given resourceName. If there's no such + * pendingAsk, returns 0 + * + * @param resourceName resourceName + * @return #pending-allocations + */ + int getOutstandingAsksCount(String resourceName); /** * Notify container allocated. * @param schedulerKey SchedulerRequestKey for this ResourceRequest * @param type Type of the allocation * @param node Which node this container allocated on - * @param request Which resource request to allocate * @return list of ResourceRequests deducted */ List allocate(SchedulerRequestKey schedulerKey, - NodeType type, SchedulerNode node, ResourceRequest request); + NodeType type, SchedulerNode node); + + /** + * Returns list of accepted resourceNames. + * @return Iterator of accepted resourceNames + */ + Iterator getAcceptedResouceNames(); /** * We can still have pending requirement for a given NodeType and node @@ -94,4 +112,47 @@ public interface SchedulingPlacementSet { * @return true if we has pending requirement */ boolean canAllocate(NodeType type, SchedulerNode node); + + /** + * Can delay to give locality? + * TODO (wangda): This should be moved out of SchedulingPlacementSet + * and should belong to specific delay scheduling policy impl. + * + * @param resourceName resourceName + * @return can/cannot + */ + boolean canDelayTo(String resourceName); + + /** + * Does this {@link SchedulingPlacementSet} accept resources on nodePartition? + * + * @param nodePartition nodePartition + * @param schedulingMode schedulingMode + * @return accepted/not + */ + boolean acceptNodePartition(String nodePartition, + SchedulingMode schedulingMode); + + /** + * It is possible that one request can accept multiple node partition, + * So this method returns primary node partition for pending resource / + * headroom calculation. + * + * @return primary requested node partition + */ + String getPrimaryRequestedNodePartition(); + + /** + * @return number of unique location asks with #pending greater than 0, + * (like /rack1, host1, etc.). + * + * TODO (wangda): This should be moved out of SchedulingPlacementSet + * and should belong to specific delay scheduling policy impl. + */ + int getUniqueLocationAsks(); + + /** + * Print human-readable requests to LOG debug. + */ + void showRequests(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 63953397d11..b32f1384a63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -588,12 +589,14 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { // The core part of this test // The killed containers' ResourceRequests are recovered back to the // original app-attempt, not the new one - for (ResourceRequest request : firstSchedulerAppAttempt - .getAppSchedulingInfo().getAllResourceRequests()) { - if (request.getPriority().getPriority() == 0) { - Assert.assertEquals(0, request.getNumContainers()); - } else if (request.getPriority().getPriority() == ALLOCATED_CONTAINER_PRIORITY) { - Assert.assertEquals(1, request.getNumContainers()); + for (SchedulerRequestKey key : firstSchedulerAppAttempt.getSchedulerKeys()) { + if (key.getPriority().getPriority() == 0) { + Assert.assertEquals(0, + firstSchedulerAppAttempt.getOutstandingAsksCount(key)); + } else if (key.getPriority().getPriority() == + ALLOCATED_CONTAINER_PRIORITY) { + Assert.assertEquals(1, + firstSchedulerAppAttempt.getOutstandingAsksCount(key)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index 468e7605e00..bb29889a854 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -141,7 +141,7 @@ public class TestAppSchedulingInfo { // iterate to verify no ConcurrentModificationException for (SchedulerRequestKey schedulerKey : info.getSchedulerKeys()) { - info.allocate(NodeType.OFF_SWITCH, null, schedulerKey, req1, null); + info.allocate(NodeType.OFF_SWITCH, null, schedulerKey, null); } Assert.assertEquals(1, info.getSchedulerKeys().size()); Assert.assertEquals(SchedulerRequestKey.create(req2), @@ -153,7 +153,7 @@ public class TestAppSchedulingInfo { reqs.add(req2); info.updateResourceRequests(reqs, false); info.allocate(NodeType.OFF_SWITCH, null, SchedulerRequestKey.create(req2), - req2, null); + null); Assert.assertEquals(0, info.getSchedulerKeys().size()); req1 = ResourceRequest.newInstance(pri1, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java index ff5dc02a062..9a6c8d4e8ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -93,8 +93,7 @@ public class TestSchedulerApplicationAttempt { app.liveContainers.put(container1.getContainerId(), container1); SchedulerNode node = createNode(); app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node, - toSchedulerKey(requestedPriority), - request, container1.getContainer()); + toSchedulerKey(requestedPriority), container1.getContainer()); // Reserved container Priority prio1 = Priority.newInstance(1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 43bdc8e40bc..62300988d43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -187,7 +187,7 @@ public class TestCapacityScheduler { private ResourceManager resourceManager = null; private RMContext mockContext; - + @Before public void setUp() throws Exception { resourceManager = new ResourceManager() { @@ -198,11 +198,11 @@ public class TestCapacityScheduler { return mgr; } }; - CapacitySchedulerConfiguration csConf + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(csConf); YarnConfiguration conf = new YarnConfiguration(csConf); - conf.setClass(YarnConfiguration.RM_SCHEDULER, + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); resourceManager.init(conf); resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); @@ -262,7 +262,7 @@ public class TestCapacityScheduler { new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( hostName, containerManagerPort, httpPort, rackName, capability, resourceManager); - NodeAddedSchedulerEvent nodeAddEvent1 = + NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext() .getRMNodes().get(nm.getNodeId())); resourceManager.getResourceScheduler().handle(nodeAddEvent1); @@ -273,89 +273,89 @@ public class TestCapacityScheduler { public void testCapacityScheduler() throws Exception { LOG.info("--- START: testCapacityScheduler ---"); - + // Register node1 String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(4 * GB, 1)); - + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(4 * GB, 1)); + // Register node2 String host_1 = "host_1"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = - registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(2 * GB, 1)); + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(2 * GB, 1)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); Priority priority_1 = Priority.newInstance(1); - + // Submit an application Application application_0 = new Application("user_0", "a1", resourceManager); application_0.submit(); - + application_0.addNodeManager(host_0, 1234, nm_0); application_0.addNodeManager(host_1, 1234, nm_1); Resource capability_0_0 = Resources.createResource(1 * GB, 1); application_0.addResourceRequestSpec(priority_1, capability_0_0); - + Resource capability_0_1 = Resources.createResource(2 * GB, 1); application_0.addResourceRequestSpec(priority_0, capability_0_1); - Task task_0_0 = new Task(application_0, priority_1, + Task task_0_0 = new Task(application_0, priority_1, new String[] {host_0, host_1}); application_0.addTask(task_0_0); - + // Submit another application Application application_1 = new Application("user_1", "b2", resourceManager); application_1.submit(); - + application_1.addNodeManager(host_0, 1234, nm_0); application_1.addNodeManager(host_1, 1234, nm_1); - + Resource capability_1_0 = Resources.createResource(3 * GB, 1); application_1.addResourceRequestSpec(priority_1, capability_1_0); - + Resource capability_1_1 = Resources.createResource(2 * GB, 1); application_1.addResourceRequestSpec(priority_0, capability_1_1); - Task task_1_0 = new Task(application_1, priority_1, + Task task_1_0 = new Task(application_1, priority_1, new String[] {host_0, host_1}); application_1.addTask(task_1_0); - + // Send resource requests to the scheduler application_0.schedule(); application_1.schedule(); // Send a heartbeat to kick the tires on the Scheduler LOG.info("Kick!"); - + // task_0_0 and task_1_0 allocated, used=4G nodeUpdate(nm_0); - + // nothing allocated nodeUpdate(nm_1); - + // Get allocations from the scheduler application_0.schedule(); // task_0_0 checkApplicationResourceUsage(1 * GB, application_0); application_1.schedule(); // task_1_0 checkApplicationResourceUsage(3 * GB, application_1); - + checkNodeResourceUsage(4*GB, nm_0); // task_0_0 (1G) and task_1_0 (3G) checkNodeResourceUsage(0*GB, nm_1); // no tasks, 2G available LOG.info("Adding new tasks..."); - - Task task_1_1 = new Task(application_1, priority_0, + + Task task_1_1 = new Task(application_1, priority_0, new String[] {ResourceRequest.ANY}); application_1.addTask(task_1_1); application_1.schedule(); - Task task_0_1 = new Task(application_0, priority_0, + Task task_0_1 = new Task(application_0, priority_0, new String[] {host_0, host_1}); application_0.addTask(task_0_1); @@ -365,11 +365,11 @@ public class TestCapacityScheduler { LOG.info("Sending hb from " + nm_0.getHostName()); // nothing new, used=4G nodeUpdate(nm_0); - + LOG.info("Sending hb from " + nm_1.getHostName()); // task_0_1 is prefer as locality, used=2G nodeUpdate(nm_1); - + // Get allocations from the scheduler LOG.info("Trying to allocate..."); application_0.schedule(); @@ -377,10 +377,10 @@ public class TestCapacityScheduler { application_1.schedule(); checkApplicationResourceUsage(5 * GB, application_1); - + nodeUpdate(nm_0); nodeUpdate(nm_1); - + checkNodeResourceUsage(4*GB, nm_0); checkNodeResourceUsage(2*GB, nm_1); @@ -394,23 +394,23 @@ public class TestCapacityScheduler { NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); resourceManager.getResourceScheduler().handle(nodeUpdate); } - + private CapacitySchedulerConfiguration setupQueueConfiguration( CapacitySchedulerConfiguration conf) { - + // Define top-level queues conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); conf.setCapacity(A, A_CAPACITY); conf.setCapacity(B, B_CAPACITY); - + // Define 2nd-level queues conf.setQueues(A, new String[] {"a1", "a2"}); conf.setCapacity(A1, A1_CAPACITY); conf.setUserLimitFactor(A1, 100.0f); conf.setCapacity(A2, A2_CAPACITY); conf.setUserLimitFactor(A2, 100.0f); - + conf.setQueues(B, new String[] {"b1", "b2", "b3"}); conf.setCapacity(B1, B1_CAPACITY); conf.setUserLimitFactor(B1, 100.0f); @@ -478,8 +478,8 @@ public class TestCapacityScheduler { conf.setMaximumCapacity(A, -1); assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getNonLabeledQueueMaximumCapacity(A),delta); } - - + + @Test public void testRefreshQueues() throws Exception { CapacityScheduler cs = new CapacityScheduler(); @@ -564,11 +564,11 @@ public class TestCapacityScheduler { return null; } - private void checkApplicationResourceUsage(int expected, + private void checkApplicationResourceUsage(int expected, Application application) { Assert.assertEquals(expected, application.getUsedResources().getMemorySize()); } - + private void checkNodeResourceUsage(int expected, org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) { Assert.assertEquals(expected, node.getUsed().getMemorySize()); @@ -649,7 +649,7 @@ public class TestCapacityScheduler { // Add a new queue b4 String B4 = B + ".b4"; float B4_CAPACITY = 10; - + B3_CAPACITY -= B4_CAPACITY; try { conf.setCapacity(A, 80f); @@ -661,7 +661,7 @@ public class TestCapacityScheduler { conf.setCapacity(B4, B4_CAPACITY); cs.reinitialize(conf,mockContext); checkQueueCapacities(cs, 80f, 20f); - + // Verify parent for B4 CSQueue rootQueue = cs.getRootQueue(); CSQueue queueB = findQueue(rootQueue, B); @@ -879,7 +879,7 @@ public class TestCapacityScheduler { ResourceScheduler.class); MockRM rm = new MockRM(conf); rm.start(); - + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB); RMApp app1 = rm.submitApp(2048); // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1 @@ -909,7 +909,7 @@ public class TestCapacityScheduler { Assert.assertEquals(1, allocated1.size()); Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize()); Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId()); - + report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); // check node report, 4 GB used and 0 GB available Assert.assertEquals(0, report_nm1.getAvailableResource().getMemorySize()); @@ -918,13 +918,13 @@ public class TestCapacityScheduler { // check container is assigned with 2 GB. Container c1 = allocated1.get(0); Assert.assertEquals(2 * GB, c1.getResource().getMemorySize()); - + // update node resource to 2 GB, so resource is over-consumed. - Map nodeResourceMap = + Map nodeResourceMap = new HashMap(); - nodeResourceMap.put(nm1.getNodeId(), + nodeResourceMap.put(nm1.getNodeId(), ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1)); - UpdateNodeResourceRequest request = + UpdateNodeResourceRequest request = UpdateNodeResourceRequest.newInstance(nodeResourceMap); AdminService as = ((MockRM)rm).getAdminService(); as.updateNodeResource(request); @@ -943,7 +943,7 @@ public class TestCapacityScheduler { report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize()); Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize()); - + // Check container can complete successfully in case of resource over-commitment. ContainerStatus containerStatus = BuilderUtils.newContainerStatus( c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource()); @@ -961,7 +961,7 @@ public class TestCapacityScheduler { Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize()); // As container return 2 GB back, the available resource becomes 0 again. Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); - + // Verify no NPE is trigger in schedule after resource is updated. am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1); alloc1Response = am1.schedule(); @@ -979,7 +979,7 @@ public class TestCapacityScheduler { 0, alloc1Response.getAllocatedContainers().size()); rm.stop(); } - + @Test public void testGetAppsInQueue() throws Exception { Application application_0 = new Application("user_0", "a1", resourceManager); @@ -1027,7 +1027,7 @@ public class TestCapacityScheduler { cs.getSchedulerApplications(), cs, "a1"); Assert.assertEquals("a1", app.getQueue().getQueueName()); } - + @Test public void testAsyncScheduling() throws Exception { Configuration conf = new Configuration(); @@ -1038,7 +1038,7 @@ public class TestCapacityScheduler { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); final int NODES = 100; - + // Register nodes for (int i=0; i < NODES; ++i) { String host = "192.168.1." + i; @@ -1046,7 +1046,7 @@ public class TestCapacityScheduler { MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host); cs.handle(new NodeAddedSchedulerEvent(node)); } - + // Now directly exercise the scheduling loop for (int i=0; i < NODES; ++i) { CapacityScheduler.schedule(cs); @@ -1068,7 +1068,7 @@ public class TestCapacityScheduler { && attemptPM.getResourcePreempted().equals(currentAttemptPreempted) && app.getCurrentAppAttempt().getRMAppAttemptMetrics() .getIsPreempted() == currentAttemptAMPreempted - && attemptPM.getNumNonAMContainersPreempted() == + && attemptPM.getNumNonAMContainersPreempted() == numLatestAttemptTaskPreempted) { return; } @@ -1082,7 +1082,7 @@ public class TestCapacityScheduler { Thread.sleep(500); } } - + @Test(timeout = 30000) public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception { final YarnConfiguration conf = new YarnConfiguration(); @@ -1301,7 +1301,7 @@ public class TestCapacityScheduler { rm1.stop(); } - + @Test(timeout = 300000) public void testRecoverRequestAfterPreemption() throws Exception { Configuration conf = new Configuration(); @@ -1335,8 +1335,9 @@ public class TestCapacityScheduler { // Already the node local resource request is cleared from RM after // allocation. - Assert.assertNull(app.getResourceRequest( - SchedulerRequestKey.create(request), request.getResourceName())); + Assert.assertEquals(0, + app.getOutstandingAsksCount(SchedulerRequestKey.create(request), + request.getResourceName())); } // Call killContainer to preempt the container @@ -1346,10 +1347,9 @@ public class TestCapacityScheduler { for (ResourceRequest request : requests) { // Resource request must have added back in RM after preempt event // handling. - Assert.assertEquals( - 1, - app.getResourceRequest(SchedulerRequestKey.create(request), - request.getResourceName()).getNumContainers()); + Assert.assertEquals(1, + app.getOutstandingAsksCount(SchedulerRequestKey.create(request), + request.getResourceName())); } // New container will be allocated and will move to ALLOCATED state @@ -2617,7 +2617,7 @@ public class TestCapacityScheduler { assertEquals("queue B2 max vcores allocation", 12, ((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores()); } - + private void waitContainerAllocated(MockAM am, int mem, int nContainer, int startContainerId, MockRM rm, MockNM nm) throws Exception { for (int cId = startContainerId; cId < startContainerId + nContainer; cId++) { @@ -2651,44 +2651,44 @@ public class TestCapacityScheduler { MockNM nm1 = new MockNM("127.0.0.1:1234", 100 * GB, rm1.getResourceTrackerService()); nm1.registerNode(); - + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - + waitContainerAllocated(am1, 1 * GB, 1, 2, rm1, nm1); // Maximum resoure of b1 is 100 * 0.895 * 0.792 = 71 GB // 2 GBs used by am, so it's 71 - 2 = 69G. Assert.assertEquals(69 * GB, am1.doHeartbeat().getAvailableResources().getMemorySize()); - + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b2"); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); - + // Allocate 5 containers, each one is 8 GB in am2 (40 GB in total) waitContainerAllocated(am2, 8 * GB, 5, 2, rm1, nm1); - + // Allocated one more container with 1 GB resource in b1 waitContainerAllocated(am1, 1 * GB, 1, 3, rm1, nm1); - + // Total is 100 GB, // B2 uses 41 GB (5 * 8GB containers and 1 AM container) // B1 uses 3 GB (2 * 1GB containers and 1 AM container) // Available is 100 - 41 - 3 = 56 GB Assert.assertEquals(56 * GB, am1.doHeartbeat().getAvailableResources().getMemorySize()); - + // Now we submit app3 to a1 (in higher level hierarchy), to see if headroom // of app1 (in queue b1) updated correctly RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a1"); MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1); - + // Allocate 3 containers, each one is 8 GB in am3 (24 GB in total) waitContainerAllocated(am3, 8 * GB, 3, 2, rm1, nm1); - + // Allocated one more container with 4 GB resource in b1 waitContainerAllocated(am1, 1 * GB, 1, 4, rm1, nm1); - + // Total is 100 GB, // B2 uses 41 GB (5 * 8GB containers and 1 AM container) // B1 uses 4 GB (3 * 1GB containers and 1 AM container) @@ -2697,7 +2697,7 @@ public class TestCapacityScheduler { Assert.assertEquals(30 * GB, am1.doHeartbeat().getAvailableResources().getMemorySize()); } - + @Test public void testParentQueueMaxCapsAreRespected() throws Exception { /* @@ -2713,7 +2713,7 @@ public class TestCapacityScheduler { csConf.setCapacity(A, 50); csConf.setMaximumCapacity(A, 50); csConf.setCapacity(B, 50); - + // Define 2nd-level queues csConf.setQueues(A, new String[] {"a1", "a2"}); csConf.setCapacity(A1, 50); @@ -2722,7 +2722,7 @@ public class TestCapacityScheduler { csConf.setUserLimitFactor(A2, 100.0f); csConf.setCapacity(B1, B1_CAPACITY); csConf.setUserLimitFactor(B1, 100.0f); - + YarnConfiguration conf = new YarnConfiguration(csConf); conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); @@ -2733,12 +2733,12 @@ public class TestCapacityScheduler { MockNM nm1 = new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService()); nm1.registerNode(); - + // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1); - + // Try to launch app2 in a2, asked 2GB, should success RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "a2"); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); @@ -2755,24 +2755,24 @@ public class TestCapacityScheduler { Assert.fail("Shouldn't successfully allocate containers for am2, " + "queue-a's max capacity will be violated if container allocated"); } - + @SuppressWarnings("unchecked") private Set toSet(E... elements) { Set set = Sets.newHashSet(elements); return set; } - + @Test public void testQueueHierarchyPendingResourceUpdate() throws Exception { Configuration conf = TestUtils.getConfigurationWithQueueLabels(new Configuration(false)); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); - + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); mgr.init(conf); mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); - + MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); MockRM rm = new MockRM(conf, memStore) { @@ -2780,74 +2780,74 @@ public class TestCapacityScheduler { return mgr; } }; - + rm.start(); MockNM nm1 = // label = x new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); nm1.registerNode(); - + MockNM nm2 = // label = "" new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService()); nm2.registerNode(); - + // Launch app1 in queue=a1 RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); - + // Launch app2 in queue=b1 RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1"); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); - + // am1 asks for 8 * 1GB container for no label am1.allocate(Arrays.asList(ResourceRequest.newInstance( Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)), null); - + checkPendingResource(rm, "a1", 8 * GB, null); checkPendingResource(rm, "a", 8 * GB, null); checkPendingResource(rm, "root", 8 * GB, null); - + // am2 asks for 8 * 1GB container for no label am2.allocate(Arrays.asList(ResourceRequest.newInstance( Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)), null); - + checkPendingResource(rm, "a1", 8 * GB, null); checkPendingResource(rm, "a", 8 * GB, null); checkPendingResource(rm, "b1", 8 * GB, null); checkPendingResource(rm, "b", 8 * GB, null); // root = a + b checkPendingResource(rm, "root", 16 * GB, null); - + // am2 asks for 8 * 1GB container in another priority for no label am2.allocate(Arrays.asList(ResourceRequest.newInstance( Priority.newInstance(2), "*", Resources.createResource(1 * GB), 8)), null); - + checkPendingResource(rm, "a1", 8 * GB, null); checkPendingResource(rm, "a", 8 * GB, null); checkPendingResource(rm, "b1", 16 * GB, null); checkPendingResource(rm, "b", 16 * GB, null); // root = a + b checkPendingResource(rm, "root", 24 * GB, null); - + // am1 asks 4 GB resource instead of 8 * GB for priority=1 am1.allocate(Arrays.asList(ResourceRequest.newInstance( Priority.newInstance(1), "*", Resources.createResource(4 * GB), 1)), null); - + checkPendingResource(rm, "a1", 4 * GB, null); checkPendingResource(rm, "a", 4 * GB, null); checkPendingResource(rm, "b1", 16 * GB, null); checkPendingResource(rm, "b", 16 * GB, null); // root = a + b checkPendingResource(rm, "root", 20 * GB, null); - + // am1 asks 8 * GB resource which label=x am1.allocate(Arrays.asList(ResourceRequest.newInstance( Priority.newInstance(2), "*", Resources.createResource(8 * GB), 1, true, "x")), null); - + checkPendingResource(rm, "a1", 4 * GB, null); checkPendingResource(rm, "a", 4 * GB, null); checkPendingResource(rm, "a1", 8 * GB, "x"); @@ -2857,7 +2857,7 @@ public class TestCapacityScheduler { // root = a + b checkPendingResource(rm, "root", 20 * GB, null); checkPendingResource(rm, "root", 8 * GB, "x"); - + // some containers allocated for am1, pending resource should decrease ContainerId containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); @@ -2866,7 +2866,7 @@ public class TestCapacityScheduler { containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); Assert.assertTrue(rm.waitForState(nm2, containerId, RMContainerState.ALLOCATED)); - + checkPendingResource(rm, "a1", 0 * GB, null); checkPendingResource(rm, "a", 0 * GB, null); checkPendingResource(rm, "a1", 0 * GB, "x"); @@ -2878,23 +2878,23 @@ public class TestCapacityScheduler { // root = a + b checkPendingResourceGreaterThanZero(rm, "root", null); checkPendingResource(rm, "root", 0 * GB, "x"); - + // complete am2, pending resource should be 0 now AppAttemptRemovedSchedulerEvent appRemovedEvent = new AppAttemptRemovedSchedulerEvent( am2.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false); rm.getResourceScheduler().handle(appRemovedEvent); - + checkPendingResource(rm, "a1", 0 * GB, null); checkPendingResource(rm, "a", 0 * GB, null); checkPendingResource(rm, "a1", 0 * GB, "x"); - checkPendingResource(rm, "a", 0 * GB, "x"); + checkPendingResource(rm, "a", 0 * GB, "x"); checkPendingResource(rm, "b1", 0 * GB, null); checkPendingResource(rm, "b", 0 * GB, null); checkPendingResource(rm, "root", 0 * GB, null); checkPendingResource(rm, "root", 0 * GB, "x"); } - + private void checkPendingResource(MockRM rm, String queueName, int memory, String label) { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); @@ -2932,10 +2932,10 @@ public class TestCapacityScheduler { Resource minAllocResource = Resource.newInstance(minAllocMb, 1); String queueName = "a1"; RMApp rmApp = rm.submitApp(amMemory, "app-1", "user_0", null, queueName); - + assertEquals("RMApp does not containes minimum allocation", minAllocResource, rmApp.getAMResourceRequest().getCapability()); - + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); LeafQueue queueA = (LeafQueue) ((CapacityScheduler) scheduler).getQueue(queueName); @@ -3164,7 +3164,7 @@ public class TestCapacityScheduler { DominantResourceCalculator.class.getName()); verifyAMLimitForLeafQueue(config); } - + private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm, ApplicationId appId) { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); @@ -3177,10 +3177,10 @@ public class TestCapacityScheduler { Configuration conf = TestUtils.getConfigurationWithQueueLabels(new Configuration(false)); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); - + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); mgr.init(conf); - + MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); MockRM rm = new MockRM(conf, memStore) { @@ -3188,17 +3188,17 @@ public class TestCapacityScheduler { return mgr; } }; - + rm.start(); - + MockNM nm1 = // label = "" new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); nm1.registerNode(); - + // Launch app1 in queue=a1 RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); - + // Allocate two more containers am1.allocate( Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), @@ -3227,15 +3227,15 @@ public class TestCapacityScheduler { .newInstance(0, containerId1, ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(3 * GB), null))); - + FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId()); - + Assert.assertEquals(2 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); checkPendingResource(rm, "a1", 2 * GB, null); checkPendingResource(rm, "a", 2 * GB, null); checkPendingResource(rm, "root", 2 * GB, null); - + // am1 asks to change containerId2 (2G -> 3G) and containerId3 (2G -> 5G) am1.sendContainerResizingRequest(Arrays.asList( UpdateContainerRequest @@ -3246,13 +3246,13 @@ public class TestCapacityScheduler { .newInstance(0, containerId3, ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(5 * GB), null))); - + Assert.assertEquals(6 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); checkPendingResource(rm, "a1", 6 * GB, null); checkPendingResource(rm, "a", 6 * GB, null); checkPendingResource(rm, "root", 6 * GB, null); - + // am1 asks to change containerId1 (1G->3G), containerId2 (2G -> 4G) and // containerId3 (2G -> 2G) am1.sendContainerResizingRequest(Arrays.asList( @@ -3335,7 +3335,7 @@ public class TestCapacityScheduler { + CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES; conf.setInt(propName, maxAllocVcores); } - + private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); RMContainer rmContainer = cs.getRMContainer(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index e49532848c6..b2695bc17e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -1056,9 +1055,13 @@ public class TestLeafQueue { //test case 3 qb.finishApplication(app_0.getApplicationId(), user_0); qb.finishApplication(app_2.getApplicationId(), user_1); - qb.releaseResource(clusterResource, app_0, app_0.getResource(u0SchedKey), + qb.releaseResource(clusterResource, app_0, + app_0.getAppSchedulingInfo().getPendingAsk(u0SchedKey) + .getPerAllocationResource(), null, null, false); - qb.releaseResource(clusterResource, app_2, app_2.getResource(u1SchedKey), + qb.releaseResource(clusterResource, app_2, + app_2.getAppSchedulingInfo().getPendingAsk(u1SchedKey) + .getPerAllocationResource(), null, null, false); qb.setUserLimit(50); @@ -1956,7 +1959,7 @@ public class TestLeafQueue { applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(3, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling @@ -1965,7 +1968,7 @@ public class TestLeafQueue { applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(3, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling @@ -1974,7 +1977,7 @@ public class TestLeafQueue { applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(3, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, now we should allocate @@ -1985,7 +1988,7 @@ public class TestLeafQueue { verifyContainerAllocated(assignment, NodeType.OFF_SWITCH); // should NOT reset assertEquals(4, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(2, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey)); // NODE_LOCAL - node_0 assignment = a.assignContainers(clusterResource, node_0, @@ -1994,7 +1997,7 @@ public class TestLeafQueue { verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); // should reset assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(1, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey)); // NODE_LOCAL - node_1 assignment = a.assignContainers(clusterResource, node_1, @@ -2003,7 +2006,7 @@ public class TestLeafQueue { verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); // should reset assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(0, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // Add 1 more request to check for RACK_LOCAL @@ -2018,7 +2021,7 @@ public class TestLeafQueue { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 4, // one extra true, priority, recordFactory)); app_0.updateResourceRequests(app_0_requests_0); - assertEquals(4, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(4, app_0.getOutstandingAsksCount(schedulerKey)); // Rack-delay doReturn(true).when(a).getRackLocalityFullReset(); @@ -2029,7 +2032,7 @@ public class TestLeafQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); applyCSAssignment(clusterResource, assignment, a, nodes, apps); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(4, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(4, app_0.getOutstandingAsksCount(schedulerKey)); // Should assign RACK_LOCAL now assignment = a.assignContainers(clusterResource, node_3, @@ -2038,14 +2041,14 @@ public class TestLeafQueue { verifyContainerAllocated(assignment, NodeType.RACK_LOCAL); // should reset assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(3, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey)); // Shouldn't assign RACK_LOCAL because schedulingOpportunities should have gotten reset. assignment = a.assignContainers(clusterResource, node_3, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); applyCSAssignment(clusterResource, assignment, a, nodes, apps); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(3, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey)); // Next time we schedule RACK_LOCAL, don't reset doReturn(false).when(a).getRackLocalityFullReset(); @@ -2057,7 +2060,7 @@ public class TestLeafQueue { verifyContainerAllocated(assignment, NodeType.RACK_LOCAL); // should NOT reset assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(2, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey)); // Another RACK_LOCAL since schedulingOpportunities not reset assignment = a.assignContainers(clusterResource, node_3, @@ -2066,7 +2069,7 @@ public class TestLeafQueue { verifyContainerAllocated(assignment, NodeType.RACK_LOCAL); // should NOT reset assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(1, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey)); // Add a request larger than cluster size to verify // OFF_SWITCH delay is capped by cluster size @@ -2185,9 +2188,9 @@ public class TestLeafQueue { applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey1)); - assertEquals(2, app_0.getTotalRequiredResources(schedulerKey1)); + assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey1)); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey2)); - assertEquals(1, app_0.getTotalRequiredResources(schedulerKey2)); + assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey2)); // Another off-switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! @@ -2196,9 +2199,9 @@ public class TestLeafQueue { applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey1)); - assertEquals(2, app_0.getTotalRequiredResources(schedulerKey1)); + assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey1)); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey2)); - assertEquals(1, app_0.getTotalRequiredResources(schedulerKey2)); + assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey2)); // Another off-switch, shouldn't allocate OFF_SWITCH P1 assignment = a.assignContainers(clusterResource, node_2, @@ -2206,9 +2209,9 @@ public class TestLeafQueue { applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.OFF_SWITCH); assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey1)); - assertEquals(1, app_0.getTotalRequiredResources(schedulerKey1)); + assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey1)); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey2)); - assertEquals(1, app_0.getTotalRequiredResources(schedulerKey2)); + assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey2)); // Now, DATA_LOCAL for P1 assignment = a.assignContainers(clusterResource, node_0, @@ -2216,9 +2219,9 @@ public class TestLeafQueue { applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey1)); - assertEquals(0, app_0.getTotalRequiredResources(schedulerKey1)); + assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey1)); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey2)); - assertEquals(1, app_0.getTotalRequiredResources(schedulerKey2)); + assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey2)); // Now, OFF_SWITCH for P2 assignment = a.assignContainers(clusterResource, node_1, @@ -2226,9 +2229,9 @@ public class TestLeafQueue { applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.OFF_SWITCH); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey1)); - assertEquals(0, app_0.getTotalRequiredResources(schedulerKey1)); + assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey1)); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey2)); - assertEquals(0, app_0.getTotalRequiredResources(schedulerKey2)); + assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey2)); } @@ -2309,7 +2312,7 @@ public class TestLeafQueue { verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); // should reset - assertEquals(0, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey)); // No allocation on node_1_0 even though it's node/rack local since // required(ANY) == 0 @@ -2320,7 +2323,7 @@ public class TestLeafQueue { // Still zero // since #req=0 assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(0, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey)); // Add one request app_0_requests_0.clear(); @@ -2336,7 +2339,7 @@ public class TestLeafQueue { applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(1, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey)); // NODE_LOCAL - node_1 assignment = a.assignContainers(clusterResource, node_1_0, @@ -2345,7 +2348,7 @@ public class TestLeafQueue { verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); // should reset assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(0, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey)); } @Test (timeout = 30000) @@ -2721,7 +2724,7 @@ public class TestLeafQueue { applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyNoContainerAllocated(assignment); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(1, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey)); // Now sanity-check node_local app_0_requests_0.add( @@ -2752,7 +2755,7 @@ public class TestLeafQueue { applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(0, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey)); } @@ -3205,7 +3208,7 @@ public class TestLeafQueue { applyCSAssignment(clusterResource, assignment, a, nodes, apps); verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey)); - assertEquals(3, app_0.getTotalRequiredResources(schedulerKey)); + assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey)); assertEquals(0, app_0.getLiveContainers().size()); assertEquals(1, app_1.getLiveContainers().size()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index d2b5ae175ce..c5b3f00ec67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; @@ -56,6 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -548,11 +551,12 @@ public class TestNodeLabelContainerAllocation { ApplicationAttemptId attemptId, int memory) { CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); FiCaSchedulerApp app = cs.getApplicationAttempt(attemptId); - ResourceRequest rr = - app.getAppSchedulingInfo().getResourceRequest( + PendingAsk ask = + app.getAppSchedulingInfo().getPendingAsk( TestUtils.toSchedulerKey(priority), "*"); Assert.assertEquals(memory, - rr.getCapability().getMemorySize() * rr.getNumContainers()); + ask.getPerAllocationResource().getMemorySize() * ask + .getCount()); } private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId, @@ -607,18 +611,10 @@ public class TestNodeLabelContainerAllocation { (CapacityScheduler) rm1.getRMContext().getScheduler(); FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId()); - List allResourceRequests = - app.getAppSchedulingInfo().getAllResourceRequests(); - for (ResourceRequest changeReq : allResourceRequests) { - if (changeReq.getPriority().getPriority() == 2 - || changeReq.getPriority().getPriority() == 3) { - Assert.assertEquals("Expected label y", "y", - changeReq.getNodeLabelExpression()); - } else if (changeReq.getPriority().getPriority() == 4) { - Assert.assertEquals("Expected label EMPTY", - RMNodeLabelsManager.NO_LABEL, changeReq.getNodeLabelExpression()); - } - } + checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 2, "y"); + checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 3, "y"); + checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 4, + RMNodeLabelsManager.NO_LABEL); // Previous any request was Y trying to update with z and the // request before ANY label is null @@ -628,17 +624,11 @@ public class TestNodeLabelContainerAllocation { newReq.add(am1.createResourceReq("h1:1234", 1024, 3, 4, null)); newReq.add(am1.createResourceReq("*", 1024, 4, 5, "z")); am1.allocate(newReq, new ArrayList()); - allResourceRequests = app.getAppSchedulingInfo().getAllResourceRequests(); - for (ResourceRequest changeReq : allResourceRequests) { - if (changeReq.getPriority().getPriority() == 3 - || changeReq.getPriority().getPriority() == 4) { - Assert.assertEquals("Expected label z", "z", - changeReq.getNodeLabelExpression()); - } else if (changeReq.getPriority().getPriority() == 2) { - Assert.assertEquals("Expected label y", "y", - changeReq.getNodeLabelExpression()); - } - } + + checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 3, "z"); + checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 4, "z"); + checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 2, "y"); + // Request before ANY and ANY request is set as NULL. Request should be set // with Empty Label List resourceRequest1 = new ArrayList(); @@ -653,14 +643,21 @@ public class TestNodeLabelContainerAllocation { RMNodeLabelsManager.NO_LABEL)); resourceRequest1.add(am1.createResourceReq("h2:1234", 1024, 2, 4, null)); am1.allocate(resourceRequest1, new ArrayList()); - allResourceRequests = app.getAppSchedulingInfo().getAllResourceRequests(); - for (ResourceRequest changeReq : allResourceRequests) { - if (changeReq.getPriority().getPriority() == 3) { - Assert.assertEquals("Expected label Empty", - RMNodeLabelsManager.NO_LABEL, changeReq.getNodeLabelExpression()); - } else if (changeReq.getPriority().getPriority() == 2) { - Assert.assertEquals("Expected label y", RMNodeLabelsManager.NO_LABEL, - changeReq.getNodeLabelExpression()); + + checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 3, + RMNodeLabelsManager.NO_LABEL); + checkNodePartitionOfRequestedPriority(app.getAppSchedulingInfo(), 2, + RMNodeLabelsManager.NO_LABEL); + } + + private void checkNodePartitionOfRequestedPriority(AppSchedulingInfo info, + int priority, String expectedPartition) { + for (SchedulerRequestKey key : info.getSchedulerKeys()) { + if (key.getPriority().getPriority() == priority) { + Assert.assertEquals("Expected partition is " + expectedPartition, + expectedPartition, + info.getSchedulingPlacementSet(key) + .getPrimaryRequestedNodePartition()); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index f9bf89de604..3a154b2c220 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -329,7 +329,7 @@ public class TestReservations { assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize()); assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize()); - assertEquals(2, app_0.getTotalRequiredResources( + assertEquals(2, app_0.getOutstandingAsksCount( toSchedulerKey(priorityReduce))); // try to assign reducer (5G on node 0 and should reserve) @@ -348,7 +348,7 @@ public class TestReservations { assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize()); assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize()); - assertEquals(2, app_0.getTotalRequiredResources( + assertEquals(2, app_0.getOutstandingAsksCount( toSchedulerKey(priorityReduce))); // assign reducer to node 2 @@ -367,7 +367,7 @@ public class TestReservations { assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize()); assertEquals(5 * GB, node_2.getAllocatedResource().getMemorySize()); - assertEquals(1, app_0.getTotalRequiredResources( + assertEquals(1, app_0.getOutstandingAsksCount( toSchedulerKey(priorityReduce))); // node_1 heartbeat and unreserves from node_0 in order to allocate @@ -386,7 +386,7 @@ public class TestReservations { assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(8 * GB, node_1.getAllocatedResource().getMemorySize()); assertEquals(5 * GB, node_2.getAllocatedResource().getMemorySize()); - assertEquals(0, app_0.getTotalRequiredResources( + assertEquals(0, app_0.getOutstandingAsksCount( toSchedulerKey(priorityReduce))); } @@ -662,7 +662,7 @@ public class TestReservations { assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize()); assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize()); - assertEquals(2, app_0.getTotalRequiredResources( + assertEquals(2, app_0.getOutstandingAsksCount( toSchedulerKey(priorityReduce))); // try to assign reducer (5G on node 0 and should reserve) @@ -681,7 +681,7 @@ public class TestReservations { assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize()); assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize()); - assertEquals(2, app_0.getTotalRequiredResources( + assertEquals(2, app_0.getOutstandingAsksCount( toSchedulerKey(priorityReduce))); // assign reducer to node 2 @@ -700,7 +700,7 @@ public class TestReservations { assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize()); assertEquals(5 * GB, node_2.getAllocatedResource().getMemorySize()); - assertEquals(1, app_0.getTotalRequiredResources( + assertEquals(1, app_0.getOutstandingAsksCount( toSchedulerKey(priorityReduce))); // node_1 heartbeat and won't unreserve from node_0, potentially stuck @@ -720,7 +720,7 @@ public class TestReservations { assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize()); assertEquals(5 * GB, node_2.getAllocatedResource().getMemorySize()); - assertEquals(1, app_0.getTotalRequiredResources( + assertEquals(1, app_0.getOutstandingAsksCount( toSchedulerKey(priorityReduce))); } @@ -841,7 +841,7 @@ public class TestReservations { assertEquals(null, node_0.getReservedContainer()); assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize()); - assertEquals(2, app_0.getTotalRequiredResources( + assertEquals(2, app_0.getOutstandingAsksCount( toSchedulerKey(priorityReduce))); // try to assign reducer (5G on node 0 and should reserve) @@ -859,7 +859,7 @@ public class TestReservations { .getMemorySize()); assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(3 * GB, node_1.getAllocatedResource().getMemorySize()); - assertEquals(2, app_0.getTotalRequiredResources( + assertEquals(2, app_0.getOutstandingAsksCount( toSchedulerKey(priorityReduce))); // could allocate but told need to unreserve first @@ -876,7 +876,7 @@ public class TestReservations { assertEquals(null, node_0.getReservedContainer()); assertEquals(5 * GB, node_0.getAllocatedResource().getMemorySize()); assertEquals(8 * GB, node_1.getAllocatedResource().getMemorySize()); - assertEquals(1, app_0.getTotalRequiredResources( + assertEquals(1, app_0.getOutstandingAsksCount( toSchedulerKey(priorityReduce))); }