diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b51d89e798b..5f406690b41 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -546,6 +546,9 @@ Release 2.8.0 - UNRELEASED YARN-4285. Display resource usage as percentage of queue and cluster in the RM UI (Varun Vasudev via wangda) + YARN-3216. Max-AM-Resource-Percentage should respect node labels. + (Sunil G via wangda) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 23f00e03a53..c5f8defe921 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; @@ -146,7 +147,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { protected Queue queue; protected boolean isStopped = false; - + + private String appAMNodePartitionName = CommonNodeLabelsManager.NO_LABEL; + protected final RMContext rmContext; public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, @@ -247,10 +250,18 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return attemptResourceUsage.getAMUsed(); } + public Resource getAMResource(String label) { + return attemptResourceUsage.getAMUsed(label); + } + public void setAMResource(Resource amResource) { attemptResourceUsage.setAMUsed(amResource); } + public void setAMResource(String label, Resource amResource) { + attemptResourceUsage.setAMUsed(label, amResource); + } + public boolean isAmRunning() { return amRunning; } @@ -886,4 +897,12 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { SchedContainerChangeRequest increaseRequest) { changeContainerResource(increaseRequest, true); } + + public void setAppAMNodePartitionName(String partitionName) { + this.appAMNodePartitionName = partitionName; + } + + public String getAppAMNodePartitionName() { + return appAMNodePartitionName; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 1206026f6a1..2f981a74823 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -138,11 +138,16 @@ class CSQueueUtils { csConf.getNonLabeledQueueCapacity(queuePath) / 100); queueCapacities.setMaximumCapacity(CommonNodeLabelsManager.NO_LABEL, csConf.getNonLabeledQueueMaximumCapacity(queuePath) / 100); + queueCapacities.setMaxAMResourcePercentage( + CommonNodeLabelsManager.NO_LABEL, + csConf.getMaximumAMResourcePercentPerPartition(queuePath, label)); } else { queueCapacities.setCapacity(label, csConf.getLabeledQueueCapacity(queuePath, label) / 100); queueCapacities.setMaximumCapacity(label, csConf.getLabeledQueueMaximumCapacity(queuePath, label) / 100); + queueCapacities.setMaxAMResourcePercentage(label, + csConf.getMaximumAMResourcePercentPerPartition(queuePath, label)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index b1461c1040c..a99122d9905 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -85,7 +85,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final String MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT = PREFIX + MAXIMUM_AM_RESOURCE_SUFFIX; - + @Private public static final String QUEUES = "queues"; @@ -138,7 +138,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final float DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f; - + @Private public static final float UNDEFINED = -1; @@ -514,6 +514,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur set(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION, exp); } + public float getMaximumAMResourcePercentPerPartition(String queue, + String label) { + // If per-partition max-am-resource-percent is not configured, + // use default value as max-am-resource-percent for this queue. + return getFloat(getNodeLabelPrefix(queue, label) + + MAXIMUM_AM_RESOURCE_SUFFIX, + getMaximumApplicationMasterResourcePerQueuePercent(queue)); + } + + public void setMaximumAMResourcePercentPerPartition(String queue, + String label, float percent) { + setFloat(getNodeLabelPrefix(queue, label) + + MAXIMUM_AM_RESOURCE_SUFFIX, percent); + } + /* * Returns whether we should continue to look at all heart beating nodes even * after the reservation limit was hit. The node heart beating in could diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index a993ece7ae2..d92a821deef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -84,7 +84,7 @@ public class LeafQueue extends AbstractCSQueue { protected int maxApplicationsPerUser; private float maxAMResourcePerQueuePercent; - + private volatile int nodeLocalityDelay; Map applicationAttemptMap = @@ -122,8 +122,8 @@ public class LeafQueue extends AbstractCSQueue { // preemption, key is the partition of the RMContainer allocated on private Map> ignorePartitionExclusivityRMContainers = new HashMap<>(); - - public LeafQueue(CapacitySchedulerContext cs, + + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); this.scheduler = cs; @@ -538,79 +538,120 @@ public class LeafQueue extends AbstractCSQueue { } public synchronized Resource getAMResourceLimit() { - /* - * The limit to the amount of resources which can be consumed by - * application masters for applications running in the queue - * is calculated by taking the greater of the max resources currently - * available to the queue (see absoluteMaxAvailCapacity) and the absolute - * resources guaranteed for the queue and multiplying it by the am - * resource percent. - * - * This is to allow a queue to grow its (proportional) application - * master resource use up to its max capacity when other queues are - * idle but to scale back down to it's guaranteed capacity as they - * become busy. - * - */ - Resource queueCurrentLimit; - synchronized (queueResourceLimitsInfo) { - queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); - } - Resource queueCap = Resources.max(resourceCalculator, lastClusterResource, - absoluteCapacityResource, queueCurrentLimit); - Resource amResouceLimit = - Resources.multiplyAndNormalizeUp(resourceCalculator, queueCap, - maxAMResourcePerQueuePercent, minimumAllocation); + return getAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL); + } + + public synchronized Resource getUserAMResourceLimit() { + return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL); + } + + public synchronized Resource getUserAMResourceLimitPerPartition( + String nodePartition) { + /* + * The user am resource limit is based on the same approach as the user + * limit (as it should represent a subset of that). This means that it uses + * the absolute queue capacity (per partition) instead of the max and is + * modified by the userlimit and the userlimit factor as is the userlimit + */ + float effectiveUserLimit = Math.max(userLimit / 100.0f, + 1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1)); + + Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( + resourceCalculator, + labelManager.getResourceByLabel(nodePartition, lastClusterResource), + queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation); + + return Resources.multiplyAndNormalizeUp(resourceCalculator, + queuePartitionResource, + queueCapacities.getMaxAMResourcePercentage(nodePartition) + * effectiveUserLimit * userLimitFactor, minimumAllocation); + } + + public synchronized Resource getAMResourceLimitPerPartition( + String nodePartition) { + /* + * For non-labeled partition, get the max value from resources currently + * available to the queue and the absolute resources guaranteed for the + * partition in the queue. For labeled partition, consider only the absolute + * resources guaranteed. Multiply this value (based on labeled/ + * non-labeled), * with per-partition am-resource-percent to get the max am + * resource limit for this queue and partition. + */ + Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( + resourceCalculator, + labelManager.getResourceByLabel(nodePartition, lastClusterResource), + queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation); + + Resource queueCurrentLimit = Resources.none(); + // For non-labeled partition, we need to consider the current queue + // usage limit. + if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + synchronized (queueResourceLimitsInfo) { + queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); + } + } + + float amResourcePercent = queueCapacities + .getMaxAMResourcePercentage(nodePartition); + + // Current usable resource for this queue and partition is the max of + // queueCurrentLimit and queuePartitionResource. + Resource queuePartitionUsableResource = Resources.max(resourceCalculator, + lastClusterResource, queueCurrentLimit, queuePartitionResource); + + Resource amResouceLimit = Resources.multiplyAndNormalizeUp( + resourceCalculator, queuePartitionUsableResource, amResourcePercent, + minimumAllocation); metrics.setAMResouceLimit(amResouceLimit); return amResouceLimit; } - - public synchronized Resource getUserAMResourceLimit() { - /* - * The user amresource limit is based on the same approach as the - * user limit (as it should represent a subset of that). This means that - * it uses the absolute queue capacity instead of the max and is modified - * by the userlimit and the userlimit factor as is the userlimit - * - */ - float effectiveUserLimit = Math.max(userLimit / 100.0f, 1.0f / - Math.max(getActiveUsersManager().getNumActiveUsers(), 1)); - - return Resources.multiplyAndNormalizeUp( - resourceCalculator, - absoluteCapacityResource, - maxAMResourcePerQueuePercent * effectiveUserLimit * - userLimitFactor, minimumAllocation); - } private synchronized void activateApplications() { - //limit of allowed resource usage for application masters - Resource amLimit = getAMResourceLimit(); - Resource userAMLimit = getUserAMResourceLimit(); - + // limit of allowed resource usage for application masters + Map amPartitionLimit = new HashMap(); + Map userAmPartitionLimit = + new HashMap(); + for (Iterator i = getPendingAppsOrderingPolicy() .getAssignmentIterator(); i.hasNext();) { FiCaSchedulerApp application = i.next(); ApplicationId applicationId = application.getApplicationId(); - // Check am resource limit - Resource amIfStarted = - Resources.add(application.getAMResource(), queueUsage.getAMUsed()); - - if (LOG.isDebugEnabled()) { - LOG.debug("application AMResource " + application.getAMResource() + - " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent + - " amLimit " + amLimit + - " lastClusterResource " + lastClusterResource + - " amIfStarted " + amIfStarted); + + // Get the am-node-partition associated with each application + // and calculate max-am resource limit for this partition. + String partitionName = application.getAppAMNodePartitionName(); + + Resource amLimit = amPartitionLimit.get(partitionName); + // Verify whether we already calculated am-limit for this label. + if (amLimit == null) { + amLimit = getAMResourceLimitPerPartition(partitionName); + amPartitionLimit.put(partitionName, amLimit); } - - if (!Resources.lessThanOrEqual( - resourceCalculator, lastClusterResource, amIfStarted, amLimit)) { - if (getNumActiveApplications() < 1) { - LOG.warn("maximum-am-resource-percent is insufficient to start a" + - " single application in queue, it is likely set too low." + - " skipping enforcement to allow at least one application to start"); + // Check am resource limit. + Resource amIfStarted = Resources.add( + application.getAMResource(partitionName), + queueUsage.getAMUsed(partitionName)); + + if (LOG.isDebugEnabled()) { + LOG.debug("application AMResource " + + application.getAMResource(partitionName) + + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent + + " amLimit " + amLimit + " lastClusterResource " + + lastClusterResource + " amIfStarted " + amIfStarted + + " AM node-partition name " + partitionName); + } + + if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + amIfStarted, amLimit)) { + if (getNumActiveApplications() < 1 + || (Resources.lessThanOrEqual(resourceCalculator, + lastClusterResource, queueUsage.getAMUsed(partitionName), + Resources.none()))) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue, it is likely set too low." + + " skipping enforcement to allow at least one application" + + " to start"); } else { LOG.info("Not activating application " + applicationId + " as amIfStarted: " + amIfStarted + " exceeds amLimit: " @@ -618,22 +659,31 @@ public class LeafQueue extends AbstractCSQueue { continue; } } - + // Check user am resource limit - User user = getUser(application.getUser()); - - Resource userAmIfStarted = - Resources.add(application.getAMResource(), - user.getConsumedAMResources()); - - if (!Resources.lessThanOrEqual( - resourceCalculator, lastClusterResource, userAmIfStarted, - userAMLimit)) { - if (getNumActiveApplications() < 1) { - LOG.warn("maximum-am-resource-percent is insufficient to start a" + - " single application in queue for user, it is likely set too low." + - " skipping enforcement to allow at least one application to start"); + Resource userAMLimit = userAmPartitionLimit.get(partitionName); + + // Verify whether we already calculated user-am-limit for this label. + if (userAMLimit == null) { + userAMLimit = getUserAMResourceLimitPerPartition(partitionName); + userAmPartitionLimit.put(partitionName, userAMLimit); + } + + Resource userAmIfStarted = Resources.add( + application.getAMResource(partitionName), + user.getConsumedAMResources(partitionName)); + + if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + userAmIfStarted, userAMLimit)) { + if (getNumActiveApplications() < 1 + || (Resources.lessThanOrEqual(resourceCalculator, + lastClusterResource, queueUsage.getAMUsed(partitionName), + Resources.none()))) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue for user, it is likely set too" + + " low. skipping enforcement to allow at least one application" + + " to start"); } else { LOG.info("Not activating application " + applicationId + " for user: " + user + " as userAmIfStarted: " @@ -643,9 +693,12 @@ public class LeafQueue extends AbstractCSQueue { } user.activateApplication(); orderingPolicy.addSchedulableEntity(application); - queueUsage.incAMUsed(application.getAMResource()); - user.getResourceUsage().incAMUsed(application.getAMResource()); - metrics.incAMUsed(application.getUser(), application.getAMResource()); + queueUsage.incAMUsed(partitionName, + application.getAMResource(partitionName)); + user.getResourceUsage().incAMUsed(partitionName, + application.getAMResource(partitionName)); + metrics.incAMUsed(application.getUser(), + application.getAMResource(partitionName)); metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit); i.remove(); LOG.info("Application " + applicationId + " from user: " @@ -693,13 +746,16 @@ public class LeafQueue extends AbstractCSQueue { public synchronized void removeApplicationAttempt( FiCaSchedulerApp application, User user) { + String partitionName = application.getAppAMNodePartitionName(); boolean wasActive = orderingPolicy.removeSchedulableEntity(application); if (!wasActive) { pendingOrderingPolicy.removeSchedulableEntity(application); } else { - queueUsage.decAMUsed(application.getAMResource()); - user.getResourceUsage().decAMUsed(application.getAMResource()); + queueUsage.decAMUsed(partitionName, + application.getAMResource(partitionName)); + user.getResourceUsage().decAMUsed(partitionName, + application.getAMResource(partitionName)); metrics.decAMUsed(application.getUser(), application.getAMResource()); } applicationAttemptMap.remove(application.getApplicationAttemptId()); @@ -1328,6 +1384,22 @@ public class LeafQueue extends AbstractCSQueue { super.decUsedResource(nodeLabel, resourceToDec, application); } + public void incAMUsedResource(String nodeLabel, Resource resourceToInc, + SchedulerApplicationAttempt application) { + getUser(application.getUser()).getResourceUsage().incAMUsed(nodeLabel, + resourceToInc); + // ResourceUsage has its own lock, no addition lock needs here. + queueUsage.incAMUsed(nodeLabel, resourceToInc); + } + + public void decAMUsedResource(String nodeLabel, Resource resourceToDec, + SchedulerApplicationAttempt application) { + getUser(application.getUser()).getResourceUsage().decAMUsed(nodeLabel, + resourceToDec); + // ResourceUsage has its own lock, no addition lock needs here. + queueUsage.decAMUsed(nodeLabel, resourceToDec); + } + @VisibleForTesting public static class User { ResourceUsage userResourceUsage = new ResourceUsage(); @@ -1363,6 +1435,10 @@ public class LeafQueue extends AbstractCSQueue { return userResourceUsage.getAMUsed(); } + public Resource getConsumedAMResources(String label) { + return userResourceUsage.getAMUsed(label); + } + public int getTotalApplications() { return getPendingApplications() + getActiveApplications(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java index d0a26d6b209..65100f614b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -49,7 +49,8 @@ public class QueueCapacities { // Usage enum here to make implement cleaner private enum CapacityType { - USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5); + USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5), + MAX_AM_PERC(6); private int idx; @@ -74,6 +75,7 @@ public class QueueCapacities { sb.append("abs_max_cap=" + capacitiesArr[3] + "%, "); sb.append("cap=" + capacitiesArr[4] + "%, "); sb.append("abs_cap=" + capacitiesArr[5] + "%}"); + sb.append("max_am_perc=" + capacitiesArr[6] + "%}"); return sb.toString(); } } @@ -213,7 +215,16 @@ public class QueueCapacities { public void setAbsoluteMaximumCapacity(String label, float value) { _set(label, CapacityType.ABS_MAX_CAP, value); } - + + /* Absolute Maximum AM resource percentage Getter and Setter */ + public float getMaxAMResourcePercentage(String label) { + return _get(label, CapacityType.MAX_AM_PERC); + } + + public void setMaxAMResourcePercentage(String label, float value) { + _set(label, CapacityType.MAX_AM_PERC, value); + } + /** * Clear configurable fields, like * (absolute)capacity/(absolute)maximum-capacity, this will be used by queue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index e97da24ed80..6f4bfe59610 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -99,18 +100,26 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); - + Resource amResource; + String partition; + if (rmApp == null || rmApp.getAMResourceRequest() == null) { - //the rmApp may be undefined (the resource manager checks for this too) - //and unmanaged applications do not provide an amResource request - //in these cases, provide a default using the scheduler + // the rmApp may be undefined (the resource manager checks for this too) + // and unmanaged applications do not provide an amResource request + // in these cases, provide a default using the scheduler amResource = rmContext.getScheduler().getMinimumResourceCapability(); + partition = CommonNodeLabelsManager.NO_LABEL; } else { amResource = rmApp.getAMResourceRequest().getCapability(); + partition = + (rmApp.getAMResourceRequest().getNodeLabelExpression() == null) + ? CommonNodeLabelsManager.NO_LABEL + : rmApp.getAMResourceRequest().getNodeLabelExpression(); } - - setAMResource(amResource); + + setAppAMNodePartitionName(partition); + setAMResource(partition, amResource); setPriority(appPriority); scheduler = rmContext.getScheduler(); @@ -143,8 +152,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { containersToPreempt.remove(rmContainer.getContainerId()); - RMAuditLogger.logSuccess(getUser(), - AuditConstants.RELEASE_CONTAINER, "SchedulerApp", + RMAuditLogger.logSuccess(getUser(), + AuditConstants.RELEASE_CONTAINER, "SchedulerApp", getApplicationId(), containerId); // Update usage metrics @@ -185,10 +194,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // Update consumption and track allocations List resourceRequestList = appSchedulingInfo.allocate( type, node, priority, request, container); - attemptResourceUsage.incUsed(node.getPartition(), - container.getResource()); - - // Update resource requests related to "request" and store in RMContainer + + attemptResourceUsage.incUsed(node.getPartition(), container.getResource()); + + // Update resource requests related to "request" and store in RMContainer ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList); // Inform the container @@ -201,8 +210,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { + " container=" + container.getId() + " host=" + container.getNodeId().getHost() + " type=" + type); } - RMAuditLogger.logSuccess(getUser(), - AuditConstants.ALLOC_CONTAINER, "SchedulerApp", + RMAuditLogger.logSuccess(getUser(), + AuditConstants.ALLOC_CONTAINER, "SchedulerApp", getApplicationId(), container.getId()); return rmContainer; @@ -483,5 +492,14 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { this.attemptResourceUsage.incUsed(newPartition, containerResource); getCSLeafQueue().decUsedResource(oldPartition, containerResource, this); getCSLeafQueue().incUsedResource(newPartition, containerResource, this); + + // Update new partition name if container is AM and also update AM resource + if (rmContainer.isAMContainer()) { + setAppAMNodePartitionName(newPartition); + this.attemptResourceUsage.decAMUsed(oldPartition, containerResource); + this.attemptResourceUsage.incAMUsed(newPartition, containerResource); + getCSLeafQueue().decAMUsedResource(oldPartition, containerResource, this); + getCSLeafQueue().incAMUsedResource(newPartition, containerResource, this); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 6923dd223fe..0372cd7855a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -354,7 +355,19 @@ public class MockRM extends ResourceManager { super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null); } - + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, String queue, String amLabel) + throws Exception { + Resource resource = Records.newRecord(Resource.class); + resource.setMemory(masterMemory); + Priority priority = Priority.newInstance(0); + return submitApp(resource, name, user, acls, false, queue, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, + false, null, 0, null, true, priority, amLabel); + } + public RMApp submitApp(Resource resource, String name, String user, Map acls, String queue) throws Exception { return submitApp(resource, name, user, acls, false, queue, @@ -449,7 +462,20 @@ public class MockRM extends ResourceManager { boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, ApplicationId applicationId, long attemptFailuresValidityInterval, LogAggregationContext logAggregationContext, - boolean cancelTokensWhenComplete, Priority priority) + boolean cancelTokensWhenComplete, Priority priority) throws Exception { + return submitApp(capability, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + isAppIdProvided, applicationId, attemptFailuresValidityInterval, + logAggregationContext, cancelTokensWhenComplete, priority, ""); + } + + public RMApp submitApp(Resource capability, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long attemptFailuresValidityInterval, + LogAggregationContext logAggregationContext, + boolean cancelTokensWhenComplete, Priority priority, String amLabel) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); @@ -492,6 +518,12 @@ public class MockRM extends ResourceManager { sub.setLogAggregationContext(logAggregationContext); } sub.setCancelTokensWhenComplete(cancelTokensWhenComplete); + if (amLabel != null && !amLabel.isEmpty()) { + ResourceRequest amResourceRequest = ResourceRequest.newInstance( + Priority.newInstance(0), ResourceRequest.ANY, capability, 1); + amResourceRequest.setNodeLabelExpression(amLabel.trim()); + sub.setAMContainerResourceRequest(amResourceRequest); + } req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 8c4ffd41fe2..9f4b9f5daee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; @@ -154,6 +155,10 @@ public class TestApplicationLimits { doReturn(user).when(application).getUser(); doReturn(amResource).when(application).getAMResource(); doReturn(Priority.newInstance(0)).when(application).getPriority(); + doReturn(CommonNodeLabelsManager.NO_LABEL).when(application) + .getAppAMNodePartitionName(); + doReturn(amResource).when(application).getAMResource( + CommonNodeLabelsManager.NO_LABEL); when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod(); return application; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java new file mode 100644 index 00000000000..267abaf98ae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java @@ -0,0 +1,511 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class TestApplicationLimitsByPartition { + final static int GB = 1024; + + LeafQueue queue; + RMNodeLabelsManager mgr; + private YarnConfiguration conf; + + RMContext rmContext = null; + + @Before + public void setUp() throws IOException { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + private void simpleNodeLabelMappingToManager() throws IOException { + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), + TestUtils.toSet("x"), NodeId.newInstance("h2", 0), + TestUtils.toSet("y"))); + } + + private void complexNodeLabelMappingToManager() throws IOException { + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", + "z")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), + TestUtils.toSet("x"), NodeId.newInstance("h2", 0), + TestUtils.toSet("y"), NodeId.newInstance("h3", 0), + TestUtils.toSet("y"), NodeId.newInstance("h4", 0), + TestUtils.toSet("z"), NodeId.newInstance("h5", 0), + RMNodeLabelsManager.EMPTY_STRING_SET)); + } + + @Test(timeout = 120000) + public void testAMResourceLimitWithLabels() throws Exception { + /* + * Test Case: + * Verify AM resource limit per partition level and per queue level. So + * we use 2 queues to verify this case. + * Queue a1 supports labels (x,y). Configure am-resource-limit as 0.2 (x) + * Queue c1 supports default label. Configure am-resource-limit as 0.2 + * + * Queue A1 for label X can only support 2Gb AM resource. + * Queue C1 (empty label) can support 2Gb AM resource. + * + * Verify atleast one AM is launched, and AM resources should not go more + * than 2GB in each queue. + */ + + simpleNodeLabelMappingToManager(); + CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration) + TestUtils.getConfigurationWithQueueLabels(conf); + + // After getting queue conf, configure AM resource percent for Queue A1 + // as 0.2 (Label X) and for Queue C1 as 0.2 (Empty Label) + final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1"; + final String C1 = CapacitySchedulerConfiguration.ROOT + ".c" + ".c1"; + config.setMaximumAMResourcePercentPerPartition(A1, "x", 0.2f); + config.setMaximumApplicationMasterResourcePerQueuePercent(C1, 0.2f); + + // Now inject node label manager with this updated config + MockRM rm1 = new MockRM(config) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + rm1.registerNode("h2:1234", 10 * GB); // label = y + MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); // label = + + // Submit app1 with 1Gb AM resource to Queue A1 for label X + RMApp app1 = rm1.submitApp(GB, "app", "user", null, "a1", "x"); + MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Submit app2 with 1Gb AM resource to Queue A1 for label X + RMApp app2 = rm1.submitApp(GB, "app", "user", null, "a1", "x"); + MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // Submit 3rd app to Queue A1 for label X, and this will be pending as + // AM limit is already crossed for label X. (2GB) + rm1.submitApp(GB, "app", "user", null, "a1", "x"); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1"); + Assert.assertNotNull(leafQueue); + + // Only one AM will be activated here and second AM will be still + // pending. + Assert.assertEquals(2, leafQueue.getNumActiveApplications()); + Assert.assertEquals(1, leafQueue.getNumPendingApplications()); + + // Now verify the same test case in Queue C1 where label is not configured. + // Submit an app to Queue C1 with empty label + RMApp app3 = rm1.submitApp(GB, "app", "user", null, "c1"); + MockRM.launchAndRegisterAM(app3, rm1, nm3); + + // Submit next app to Queue C1 with empty label + RMApp app4 = rm1.submitApp(GB, "app", "user", null, "c1"); + MockRM.launchAndRegisterAM(app4, rm1, nm3); + + // Submit 3rd app to Queue C1. This will be pending as Queue's am-limit + // is reached. + rm1.submitApp(GB, "app", "user", null, "c1"); + + leafQueue = (LeafQueue) cs.getQueue("c1"); + Assert.assertNotNull(leafQueue); + + // 2 apps will be activated, third one will be pending as am-limit + // is reached. + Assert.assertEquals(2, leafQueue.getNumActiveApplications()); + Assert.assertEquals(1, leafQueue.getNumPendingApplications()); + + rm1.killApp(app3.getApplicationId()); + Thread.sleep(1000); + + // After killing one running app, pending app will also get activated. + Assert.assertEquals(2, leafQueue.getNumActiveApplications()); + Assert.assertEquals(0, leafQueue.getNumPendingApplications()); + rm1.close(); + } + + @Test(timeout = 120000) + public void testAtleastOneAMRunPerPartition() throws Exception { + /* + * Test Case: + * Even though am-resource-limit per queue/partition may cross if we + * activate an app (high am resource demand), we have to activate it + * since no other apps are running in that Queue/Partition. Here also + * we run one test case for partition level and one in queue level to + * ensure no breakage in existing functionality. + * + * Queue a1 supports labels (x,y). Configure am-resource-limit as 0.15 (x) + * Queue c1 supports default label. Configure am-resource-limit as 0.15 + * + * Queue A1 for label X can only support 1.5Gb AM resource. + * Queue C1 (empty label) can support 1.5Gb AM resource. + * + * Verify atleast one AM is launched in each Queue. + */ + simpleNodeLabelMappingToManager(); + CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration) + TestUtils.getConfigurationWithQueueLabels(conf); + + // After getting queue conf, configure AM resource percent for Queue A1 + // as 0.15 (Label X) and for Queue C1 as 0.15 (Empty Label) + final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1"; + final String C1 = CapacitySchedulerConfiguration.ROOT + ".c" + ".c1"; + config.setMaximumAMResourcePercentPerPartition(A1, "x", 0.15f); + config.setMaximumApplicationMasterResourcePerQueuePercent(C1, 0.15f); + // inject node label manager + MockRM rm1 = new MockRM(config) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + rm1.registerNode("h2:1234", 10 * GB); // label = y + MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); // label = + + // Submit app1 (2 GB) to Queue A1 and label X + RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a1", "x"); + // This app must be activated eventhough the am-resource per-partition + // limit is only for 1.5GB. + MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Submit 2nd app to label "X" with one GB and it must be pending since + // am-resource per-partition limit is crossed (1.5 GB was the limit). + rm1.submitApp(GB, "app", "user", null, "a1", "x"); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1"); + Assert.assertNotNull(leafQueue); + + // Only 1 app will be activated as am-limit for partition "x" is 0.15 + Assert.assertEquals(1, leafQueue.getNumActiveApplications()); + Assert.assertEquals(1, leafQueue.getNumPendingApplications()); + + // Now verify the same test case in Queue C1 which takes default label + // to see queue level am-resource-limit is still working as expected. + + // Submit an app to Queue C1 with empty label (2 GB) + RMApp app3 = rm1.submitApp(2 * GB, "app", "user", null, "c1"); + // This app must be activated even though the am-resource per-queue + // limit is only for 1.5GB + MockRM.launchAndRegisterAM(app3, rm1, nm3); + + // Submit 2nd app to C1 (Default label, hence am-limit per-queue will be + // considered). + rm1.submitApp(GB, "app", "user", null, "c1"); + + leafQueue = (LeafQueue) cs.getQueue("c1"); + Assert.assertNotNull(leafQueue); + + // 1 app will be activated (and it has AM resource more than queue limit) + Assert.assertEquals(1, leafQueue.getNumActiveApplications()); + Assert.assertEquals(1, leafQueue.getNumPendingApplications()); + rm1.close(); + } + + @Test(timeout = 120000) + public void testDefaultAMLimitFromQueueForPartition() throws Exception { + /* + * Test Case: + * Configure AM resource limit per queue level. If partition level config + * is not found, we will be considering per-queue level am-limit. Ensure + * this is working as expected. + * + * Queue A1 am-resource limit to be configured as 0.2 (not for partition x) + * + * Eventhough per-partition level config is not done, CS should consider + * the configuration done for queue level. + */ + simpleNodeLabelMappingToManager(); + CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration) + TestUtils.getConfigurationWithQueueLabels(conf); + + // After getting queue conf, configure AM resource percent for Queue A1 + // as 0.2 (not for partition, rather in queue level) + final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1"; + config.setMaximumApplicationMasterResourcePerQueuePercent(A1, 0.2f); + // inject node label manager + MockRM rm1 = new MockRM(config) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + rm1.registerNode("h2:1234", 10 * GB); // label = y + rm1.registerNode("h3:1234", 10 * GB); // label = + + // Submit app1 (2 GB) to Queue A1 and label X + RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a1", "x"); + MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Submit 2nd app to label "X" with one GB. Since queue am-limit is 2GB, + // 2nd app will be pending and first one will get activated. + rm1.submitApp(GB, "app", "user", null, "a1", "x"); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1"); + Assert.assertNotNull(leafQueue); + + // Only 1 app will be activated as am-limit for queue is 0.2 and same is + // used for partition "x" also. + Assert.assertEquals(1, leafQueue.getNumActiveApplications()); + Assert.assertEquals(1, leafQueue.getNumPendingApplications()); + rm1.close(); + } + + @Test(timeout = 120000) + public void testUserAMResourceLimitWithLabels() throws Exception { + /* + * Test Case: + * Verify user level AM resource limit. This test case is ran with two + * users. And per-partition level am-resource-limit will be 0.4, which + * internally will be 4GB. Hence 2GB will be available for each + * user for its AM resource. + * + * Now this test case will create a scenario where AM resource limit per + * partition is not met, but user level am-resource limit is reached. + * Hence app will be pending. + */ + + final String user_0 = "user_0"; + final String user_1 = "user_1"; + simpleNodeLabelMappingToManager(); + CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration) + TestUtils.getConfigurationWithQueueLabels(conf); + + // After getting queue conf, configure AM resource percent for Queue A1 + // as 0.4 (Label X). Also set userlimit as 50% for this queue. So when we + // have two users submitting applications, each user will get 50% of AM + // resource which is available in this partition. + final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1"; + config.setMaximumAMResourcePercentPerPartition(A1, "x", 0.4f); + config.setUserLimit(A1, 50); + + // Now inject node label manager with this updated config + MockRM rm1 = new MockRM(config) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + rm1.registerNode("h2:1234", 10 * GB); // label = y + rm1.registerNode("h3:1234", 10 * GB); // label = + + // Submit app1 with 1Gb AM resource to Queue A1 for label X for user0 + RMApp app1 = rm1.submitApp(GB, "app", user_0, null, "a1", "x"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Place few allocate requests to make it an active application + am1.allocate("*", 1 * GB, 15, new ArrayList(), ""); + + // Now submit 2nd app to Queue A1 for label X for user1 + RMApp app2 = rm1.submitApp(GB, "app", user_1, null, "a1", "x"); + MockRM.launchAndRegisterAM(app2, rm1, nm1); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1"); + Assert.assertNotNull(leafQueue); + + // Verify active applications count in this queue. + Assert.assertEquals(2, leafQueue.getNumActiveApplications()); + Assert.assertEquals(1, leafQueue.getNumActiveApplications(user_0)); + Assert.assertEquals(0, leafQueue.getNumPendingApplications()); + + // Submit 3rd app to Queue A1 for label X for user1. Now user1 will have + // 2 applications (2 GB resource) and user0 will have one app (1GB). + RMApp app3 = rm1.submitApp(GB, "app", user_1, null, "a1", "x"); + MockAM am2 = MockRM.launchAndRegisterAM(app3, rm1, nm1); + + // Place few allocate requests to make it an active application. This is + // to ensure that user1 and user0 are active users. + am2.allocate("*", 1 * GB, 10, new ArrayList(), ""); + + // Submit final app to Queue A1 for label X. Since we are trying to submit + // for user1, we need 3Gb resource for AMs. + // 4Gb -> 40% of label "X" in queue A1 + // Since we have 2 users, 50% of 4Gb will be max for each user. Here user1 + // has already crossed this 2GB limit, hence this app will be pending. + rm1.submitApp(GB, "app", user_1, null, "a1", "x"); + + // Verify active applications count per user and also in queue level. + Assert.assertEquals(3, leafQueue.getNumActiveApplications()); + Assert.assertEquals(1, leafQueue.getNumActiveApplications(user_0)); + Assert.assertEquals(2, leafQueue.getNumActiveApplications(user_1)); + Assert.assertEquals(1, leafQueue.getNumPendingApplications(user_1)); + Assert.assertEquals(1, leafQueue.getNumPendingApplications()); + rm1.close(); + } + + @Test + public void testAMResourceLimitForMultipleApplications() throws Exception { + /* + * Test Case: + * In a complex node label setup, verify am-resource-percentage calculation + * and check whether applications can get activated as per expectation. + */ + complexNodeLabelMappingToManager(); + CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration) + TestUtils.getComplexConfigurationWithQueueLabels(conf); + + /* + * Queue structure: + * root (*) + * ________________ + * / \ + * a x(100%), y(50%) b y(50%), z(100%) + * ________________ ______________ + * / / \ + * a1 (x,y) b1(no) b2(y,z) + * 100% y = 100%, z = 100% + * + * Node structure: + * h1 : x + * h2 : y + * h3 : y + * h4 : z + * h5 : NO + * + * Total resource: + * x: 10G + * y: 20G + * z: 10G + * *: 10G + * + * AM resource percentage config: + * A1 : 0.25 + * B2 : 0.15 + */ + final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1"; + final String B1 = CapacitySchedulerConfiguration.ROOT + ".b" + ".b1"; + config.setMaximumAMResourcePercentPerPartition(A1, "y", 0.25f); + config.setMaximumApplicationMasterResourcePerQueuePercent(B1, 0.15f); + + // Now inject node label manager with this updated config + MockRM rm1 = new MockRM(config) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y + MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); // label = y + rm1.registerNode("h4:1234", 10 * GB); // label = z + MockNM nm5 = rm1.registerNode("h5:1234", 10 * GB); // label = + + // Submit app1 with 2Gb AM resource to Queue A1 for label Y + RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a1", "y"); + MockRM.launchAndRegisterAM(app1, rm1, nm2); + + // Submit app2 with 1Gb AM resource to Queue A1 for label Y + RMApp app2 = rm1.submitApp(GB, "app", "user", null, "a1", "y"); + MockRM.launchAndRegisterAM(app2, rm1, nm3); + + // Submit another app with 1Gb AM resource to Queue A1 for label Y + rm1.submitApp(GB, "app", "user", null, "a1", "y"); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1"); + Assert.assertNotNull(leafQueue); + + /* + * capacity of queue A -> 50% for label Y + * capacity of queue A1 -> 100% for label Y + * + * Total resources available for label Y -> 20GB (nm2 and nm3) + * Hence in queue A1, max resource for label Y is 10GB. + * + * AM resource percent config for queue A1 -> 0.25 + * ==> 2.5Gb (3 Gb) is max-am-resource-limit + */ + Assert.assertEquals(2, leafQueue.getNumActiveApplications()); + Assert.assertEquals(1, leafQueue.getNumPendingApplications()); + + // Submit app3 with 1Gb AM resource to Queue B1 (no_label) + RMApp app3 = rm1.submitApp(GB, "app", "user", null, "b1"); + MockRM.launchAndRegisterAM(app3, rm1, nm5); + + // Submit another app with 1Gb AM resource to Queue B1 (no_label) + rm1.submitApp(GB, "app", "user", null, "b1"); + + leafQueue = (LeafQueue) cs.getQueue("b1"); + Assert.assertNotNull(leafQueue); + + /* + * capacity of queue B -> 90% for queue + * -> and 100% for no-label + * capacity of queue B1 -> 50% for no-label/queue + * + * Total resources available for no-label -> 10GB (nm5) + * Hence in queue B1, max resource for no-label is 5GB. + * + * AM resource percent config for queue B1 -> 0.15 + * ==> 1Gb is max-am-resource-limit + * + * Only one app will be activated and all othe will be pending. + */ + Assert.assertEquals(1, leafQueue.getNumActiveApplications()); + Assert.assertEquals(1, leafQueue.getNumPendingApplications()); + + rm1.close(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index 94af4e0bffe..fe24b2d736a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -95,7 +95,11 @@ public class TestCapacitySchedulerNodeLabelUpdate { private void checkUsedResource(MockRM rm, String queueName, int memory) { checkUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL); } - + + private void checkAMUsedResource(MockRM rm, String queueName, int memory) { + checkAMUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL); + } + private void checkUsedResource(MockRM rm, String queueName, int memory, String label) { CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); @@ -104,6 +108,14 @@ public class TestCapacitySchedulerNodeLabelUpdate { .getMemory()); } + private void checkAMUsedResource(MockRM rm, String queueName, int memory, + String label) { + CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = scheduler.getQueue(queueName); + Assert.assertEquals(memory, queue.getQueueResourceUsage().getAMUsed(label) + .getMemory()); + } + private void checkUserUsedResource(MockRM rm, String queueName, String userName, String partition, int memory) { CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); @@ -424,4 +436,93 @@ public class TestCapacitySchedulerNodeLabelUpdate { rm.close(); } + + @Test (timeout = 60000) + public void testAMResourceUsageWhenNodeUpdatesPartition() + throws Exception { + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z")); + + // set mapping: + // h1 -> x + // h2 -> y + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y"))); + + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 8000); + rm.registerNode("h2:1234", 8000); + rm.registerNode("h3:1234", 8000); + + ContainerId containerId2; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm.submitApp(GB, "app", "user", null, "a", "x"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // request a container. + am1.allocate("*", GB, 1, new ArrayList(), "x"); + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertTrue(rm.waitForState(nm1, containerId2, + RMContainerState.ALLOCATED, 10 * 1000)); + + // check used resource: + // queue-a used x=2G + checkUsedResource(rm, "a", 2048, "x"); + checkAMUsedResource(rm, "a", 1024, "x"); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId()); + + // change h1's label to z + cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(), + toSet("z")))); + + // Now the resources also should change from x to z. Verify AM and normal + // used resource are successfully changed. + checkUsedResource(rm, "a", 0, "x"); + checkUsedResource(rm, "a", 2048, "z"); + checkAMUsedResource(rm, "a", 0, "x"); + checkAMUsedResource(rm, "a", 1024, "z"); + checkUserUsedResource(rm, "a", "user", "x", 0); + checkUserUsedResource(rm, "a", "user", "z", 2048); + Assert.assertEquals(0, + app.getAppAttemptResourceUsage().getAMUsed("x").getMemory()); + Assert.assertEquals(1024, + app.getAppAttemptResourceUsage().getAMUsed("z").getMemory()); + + // change h1's label to no label + Set emptyLabels = new HashSet<>(); + Map> map = ImmutableMap.of(nm1.getNodeId(), + emptyLabels); + cs.handle(new NodeLabelsUpdateSchedulerEvent(map)); + checkUsedResource(rm, "a", 0, "x"); + checkUsedResource(rm, "a", 0, "z"); + checkUsedResource(rm, "a", 2048); + checkAMUsedResource(rm, "a", 0, "x"); + checkAMUsedResource(rm, "a", 0, "z"); + checkAMUsedResource(rm, "a", 1024); + checkUserUsedResource(rm, "a", "user", "x", 0); + checkUserUsedResource(rm, "a", "user", "z", 0); + checkUserUsedResource(rm, "a", "user", "", 2048); + Assert.assertEquals(0, + app.getAppAttemptResourceUsage().getAMUsed("x").getMemory()); + Assert.assertEquals(0, + app.getAppAttemptResourceUsage().getAMUsed("z").getMemory()); + Assert.assertEquals(1024, + app.getAppAttemptResourceUsage().getAMUsed("").getMemory()); + + rm.close(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 5ffcaadd458..489ef7711d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -221,13 +221,13 @@ public class TestUtils { when(container.getPriority()).thenReturn(priority); return container; } - + @SuppressWarnings("unchecked") - private static Set toSet(E... elements) { + public static Set toSet(E... elements) { Set set = Sets.newHashSet(elements); return set; } - + /** * Get a queue structure: *