YARN-3216. Max-AM-Resource-Percentage should respect node labels. (Sunil G via wangda)

This commit is contained in:
Wangda Tan 2015-10-26 16:44:39 -07:00
parent 6f606214e7
commit 56e4f6237a
12 changed files with 905 additions and 109 deletions

View File

@ -546,6 +546,9 @@ Release 2.8.0 - UNRELEASED
YARN-4285. Display resource usage as percentage of queue and cluster in the
RM UI (Varun Vasudev via wangda)
YARN-3216. Max-AM-Resource-Percentage should respect node labels.
(Sunil G via wangda)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
@ -146,7 +147,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
protected Queue queue;
protected boolean isStopped = false;
private String appAMNodePartitionName = CommonNodeLabelsManager.NO_LABEL;
protected final RMContext rmContext;
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
@ -247,10 +250,18 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return attemptResourceUsage.getAMUsed();
}
public Resource getAMResource(String label) {
return attemptResourceUsage.getAMUsed(label);
}
public void setAMResource(Resource amResource) {
attemptResourceUsage.setAMUsed(amResource);
}
public void setAMResource(String label, Resource amResource) {
attemptResourceUsage.setAMUsed(label, amResource);
}
public boolean isAmRunning() {
return amRunning;
}
@ -886,4 +897,12 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
SchedContainerChangeRequest increaseRequest) {
changeContainerResource(increaseRequest, true);
}
public void setAppAMNodePartitionName(String partitionName) {
this.appAMNodePartitionName = partitionName;
}
public String getAppAMNodePartitionName() {
return appAMNodePartitionName;
}
}

View File

@ -138,11 +138,16 @@ class CSQueueUtils {
csConf.getNonLabeledQueueCapacity(queuePath) / 100);
queueCapacities.setMaximumCapacity(CommonNodeLabelsManager.NO_LABEL,
csConf.getNonLabeledQueueMaximumCapacity(queuePath) / 100);
queueCapacities.setMaxAMResourcePercentage(
CommonNodeLabelsManager.NO_LABEL,
csConf.getMaximumAMResourcePercentPerPartition(queuePath, label));
} else {
queueCapacities.setCapacity(label,
csConf.getLabeledQueueCapacity(queuePath, label) / 100);
queueCapacities.setMaximumCapacity(label,
csConf.getLabeledQueueMaximumCapacity(queuePath, label) / 100);
queueCapacities.setMaxAMResourcePercentage(label,
csConf.getMaximumAMResourcePercentPerPartition(queuePath, label));
}
}
}

View File

@ -85,7 +85,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final String MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT =
PREFIX + MAXIMUM_AM_RESOURCE_SUFFIX;
@Private
public static final String QUEUES = "queues";
@ -138,7 +138,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final float
DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f;
@Private
public static final float UNDEFINED = -1;
@ -514,6 +514,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
set(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION, exp);
}
public float getMaximumAMResourcePercentPerPartition(String queue,
String label) {
// If per-partition max-am-resource-percent is not configured,
// use default value as max-am-resource-percent for this queue.
return getFloat(getNodeLabelPrefix(queue, label)
+ MAXIMUM_AM_RESOURCE_SUFFIX,
getMaximumApplicationMasterResourcePerQueuePercent(queue));
}
public void setMaximumAMResourcePercentPerPartition(String queue,
String label, float percent) {
setFloat(getNodeLabelPrefix(queue, label)
+ MAXIMUM_AM_RESOURCE_SUFFIX, percent);
}
/*
* Returns whether we should continue to look at all heart beating nodes even
* after the reservation limit was hit. The node heart beating in could

View File

@ -84,7 +84,7 @@ public class LeafQueue extends AbstractCSQueue {
protected int maxApplicationsPerUser;
private float maxAMResourcePerQueuePercent;
private volatile int nodeLocalityDelay;
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
@ -122,8 +122,8 @@ public class LeafQueue extends AbstractCSQueue {
// preemption, key is the partition of the RMContainer allocated on
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
new HashMap<>();
public LeafQueue(CapacitySchedulerContext cs,
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.scheduler = cs;
@ -538,79 +538,120 @@ public class LeafQueue extends AbstractCSQueue {
}
public synchronized Resource getAMResourceLimit() {
/*
* The limit to the amount of resources which can be consumed by
* application masters for applications running in the queue
* is calculated by taking the greater of the max resources currently
* available to the queue (see absoluteMaxAvailCapacity) and the absolute
* resources guaranteed for the queue and multiplying it by the am
* resource percent.
*
* This is to allow a queue to grow its (proportional) application
* master resource use up to its max capacity when other queues are
* idle but to scale back down to it's guaranteed capacity as they
* become busy.
*
*/
Resource queueCurrentLimit;
synchronized (queueResourceLimitsInfo) {
queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
}
Resource queueCap = Resources.max(resourceCalculator, lastClusterResource,
absoluteCapacityResource, queueCurrentLimit);
Resource amResouceLimit =
Resources.multiplyAndNormalizeUp(resourceCalculator, queueCap,
maxAMResourcePerQueuePercent, minimumAllocation);
return getAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL);
}
public synchronized Resource getUserAMResourceLimit() {
return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL);
}
public synchronized Resource getUserAMResourceLimitPerPartition(
String nodePartition) {
/*
* The user am resource limit is based on the same approach as the user
* limit (as it should represent a subset of that). This means that it uses
* the absolute queue capacity (per partition) instead of the max and is
* modified by the userlimit and the userlimit factor as is the userlimit
*/
float effectiveUserLimit = Math.max(userLimit / 100.0f,
1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
resourceCalculator,
labelManager.getResourceByLabel(nodePartition, lastClusterResource),
queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation);
return Resources.multiplyAndNormalizeUp(resourceCalculator,
queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition)
* effectiveUserLimit * userLimitFactor, minimumAllocation);
}
public synchronized Resource getAMResourceLimitPerPartition(
String nodePartition) {
/*
* For non-labeled partition, get the max value from resources currently
* available to the queue and the absolute resources guaranteed for the
* partition in the queue. For labeled partition, consider only the absolute
* resources guaranteed. Multiply this value (based on labeled/
* non-labeled), * with per-partition am-resource-percent to get the max am
* resource limit for this queue and partition.
*/
Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
resourceCalculator,
labelManager.getResourceByLabel(nodePartition, lastClusterResource),
queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation);
Resource queueCurrentLimit = Resources.none();
// For non-labeled partition, we need to consider the current queue
// usage limit.
if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
synchronized (queueResourceLimitsInfo) {
queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
}
}
float amResourcePercent = queueCapacities
.getMaxAMResourcePercentage(nodePartition);
// Current usable resource for this queue and partition is the max of
// queueCurrentLimit and queuePartitionResource.
Resource queuePartitionUsableResource = Resources.max(resourceCalculator,
lastClusterResource, queueCurrentLimit, queuePartitionResource);
Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionUsableResource, amResourcePercent,
minimumAllocation);
metrics.setAMResouceLimit(amResouceLimit);
return amResouceLimit;
}
public synchronized Resource getUserAMResourceLimit() {
/*
* The user amresource limit is based on the same approach as the
* user limit (as it should represent a subset of that). This means that
* it uses the absolute queue capacity instead of the max and is modified
* by the userlimit and the userlimit factor as is the userlimit
*
*/
float effectiveUserLimit = Math.max(userLimit / 100.0f, 1.0f /
Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
return Resources.multiplyAndNormalizeUp(
resourceCalculator,
absoluteCapacityResource,
maxAMResourcePerQueuePercent * effectiveUserLimit *
userLimitFactor, minimumAllocation);
}
private synchronized void activateApplications() {
//limit of allowed resource usage for application masters
Resource amLimit = getAMResourceLimit();
Resource userAMLimit = getUserAMResourceLimit();
// limit of allowed resource usage for application masters
Map<String, Resource> amPartitionLimit = new HashMap<String, Resource>();
Map<String, Resource> userAmPartitionLimit =
new HashMap<String, Resource>();
for (Iterator<FiCaSchedulerApp> i = getPendingAppsOrderingPolicy()
.getAssignmentIterator(); i.hasNext();) {
FiCaSchedulerApp application = i.next();
ApplicationId applicationId = application.getApplicationId();
// Check am resource limit
Resource amIfStarted =
Resources.add(application.getAMResource(), queueUsage.getAMUsed());
if (LOG.isDebugEnabled()) {
LOG.debug("application AMResource " + application.getAMResource() +
" maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent +
" amLimit " + amLimit +
" lastClusterResource " + lastClusterResource +
" amIfStarted " + amIfStarted);
// Get the am-node-partition associated with each application
// and calculate max-am resource limit for this partition.
String partitionName = application.getAppAMNodePartitionName();
Resource amLimit = amPartitionLimit.get(partitionName);
// Verify whether we already calculated am-limit for this label.
if (amLimit == null) {
amLimit = getAMResourceLimitPerPartition(partitionName);
amPartitionLimit.put(partitionName, amLimit);
}
if (!Resources.lessThanOrEqual(
resourceCalculator, lastClusterResource, amIfStarted, amLimit)) {
if (getNumActiveApplications() < 1) {
LOG.warn("maximum-am-resource-percent is insufficient to start a" +
" single application in queue, it is likely set too low." +
" skipping enforcement to allow at least one application to start");
// Check am resource limit.
Resource amIfStarted = Resources.add(
application.getAMResource(partitionName),
queueUsage.getAMUsed(partitionName));
if (LOG.isDebugEnabled()) {
LOG.debug("application AMResource "
+ application.getAMResource(partitionName)
+ " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent
+ " amLimit " + amLimit + " lastClusterResource "
+ lastClusterResource + " amIfStarted " + amIfStarted
+ " AM node-partition name " + partitionName);
}
if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
amIfStarted, amLimit)) {
if (getNumActiveApplications() < 1
|| (Resources.lessThanOrEqual(resourceCalculator,
lastClusterResource, queueUsage.getAMUsed(partitionName),
Resources.none()))) {
LOG.warn("maximum-am-resource-percent is insufficient to start a"
+ " single application in queue, it is likely set too low."
+ " skipping enforcement to allow at least one application"
+ " to start");
} else {
LOG.info("Not activating application " + applicationId
+ " as amIfStarted: " + amIfStarted + " exceeds amLimit: "
@ -618,22 +659,31 @@ public class LeafQueue extends AbstractCSQueue {
continue;
}
}
// Check user am resource limit
User user = getUser(application.getUser());
Resource userAmIfStarted =
Resources.add(application.getAMResource(),
user.getConsumedAMResources());
if (!Resources.lessThanOrEqual(
resourceCalculator, lastClusterResource, userAmIfStarted,
userAMLimit)) {
if (getNumActiveApplications() < 1) {
LOG.warn("maximum-am-resource-percent is insufficient to start a" +
" single application in queue for user, it is likely set too low." +
" skipping enforcement to allow at least one application to start");
Resource userAMLimit = userAmPartitionLimit.get(partitionName);
// Verify whether we already calculated user-am-limit for this label.
if (userAMLimit == null) {
userAMLimit = getUserAMResourceLimitPerPartition(partitionName);
userAmPartitionLimit.put(partitionName, userAMLimit);
}
Resource userAmIfStarted = Resources.add(
application.getAMResource(partitionName),
user.getConsumedAMResources(partitionName));
if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
userAmIfStarted, userAMLimit)) {
if (getNumActiveApplications() < 1
|| (Resources.lessThanOrEqual(resourceCalculator,
lastClusterResource, queueUsage.getAMUsed(partitionName),
Resources.none()))) {
LOG.warn("maximum-am-resource-percent is insufficient to start a"
+ " single application in queue for user, it is likely set too"
+ " low. skipping enforcement to allow at least one application"
+ " to start");
} else {
LOG.info("Not activating application " + applicationId
+ " for user: " + user + " as userAmIfStarted: "
@ -643,9 +693,12 @@ public class LeafQueue extends AbstractCSQueue {
}
user.activateApplication();
orderingPolicy.addSchedulableEntity(application);
queueUsage.incAMUsed(application.getAMResource());
user.getResourceUsage().incAMUsed(application.getAMResource());
metrics.incAMUsed(application.getUser(), application.getAMResource());
queueUsage.incAMUsed(partitionName,
application.getAMResource(partitionName));
user.getResourceUsage().incAMUsed(partitionName,
application.getAMResource(partitionName));
metrics.incAMUsed(application.getUser(),
application.getAMResource(partitionName));
metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit);
i.remove();
LOG.info("Application " + applicationId + " from user: "
@ -693,13 +746,16 @@ public class LeafQueue extends AbstractCSQueue {
public synchronized void removeApplicationAttempt(
FiCaSchedulerApp application, User user) {
String partitionName = application.getAppAMNodePartitionName();
boolean wasActive =
orderingPolicy.removeSchedulableEntity(application);
if (!wasActive) {
pendingOrderingPolicy.removeSchedulableEntity(application);
} else {
queueUsage.decAMUsed(application.getAMResource());
user.getResourceUsage().decAMUsed(application.getAMResource());
queueUsage.decAMUsed(partitionName,
application.getAMResource(partitionName));
user.getResourceUsage().decAMUsed(partitionName,
application.getAMResource(partitionName));
metrics.decAMUsed(application.getUser(), application.getAMResource());
}
applicationAttemptMap.remove(application.getApplicationAttemptId());
@ -1328,6 +1384,22 @@ public class LeafQueue extends AbstractCSQueue {
super.decUsedResource(nodeLabel, resourceToDec, application);
}
public void incAMUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().incAMUsed(nodeLabel,
resourceToInc);
// ResourceUsage has its own lock, no addition lock needs here.
queueUsage.incAMUsed(nodeLabel, resourceToInc);
}
public void decAMUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().decAMUsed(nodeLabel,
resourceToDec);
// ResourceUsage has its own lock, no addition lock needs here.
queueUsage.decAMUsed(nodeLabel, resourceToDec);
}
@VisibleForTesting
public static class User {
ResourceUsage userResourceUsage = new ResourceUsage();
@ -1363,6 +1435,10 @@ public class LeafQueue extends AbstractCSQueue {
return userResourceUsage.getAMUsed();
}
public Resource getConsumedAMResources(String label) {
return userResourceUsage.getAMUsed(label);
}
public int getTotalApplications() {
return getPendingApplications() + getActiveApplications();
}

View File

@ -49,7 +49,8 @@ public class QueueCapacities {
// Usage enum here to make implement cleaner
private enum CapacityType {
USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5);
USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5),
MAX_AM_PERC(6);
private int idx;
@ -74,6 +75,7 @@ public class QueueCapacities {
sb.append("abs_max_cap=" + capacitiesArr[3] + "%, ");
sb.append("cap=" + capacitiesArr[4] + "%, ");
sb.append("abs_cap=" + capacitiesArr[5] + "%}");
sb.append("max_am_perc=" + capacitiesArr[6] + "%}");
return sb.toString();
}
}
@ -213,7 +215,16 @@ public class QueueCapacities {
public void setAbsoluteMaximumCapacity(String label, float value) {
_set(label, CapacityType.ABS_MAX_CAP, value);
}
/* Absolute Maximum AM resource percentage Getter and Setter */
public float getMaxAMResourcePercentage(String label) {
return _get(label, CapacityType.MAX_AM_PERC);
}
public void setMaxAMResourcePercentage(String label, float value) {
_set(label, CapacityType.MAX_AM_PERC, value);
}
/**
* Clear configurable fields, like
* (absolute)capacity/(absolute)maximum-capacity, this will be used by queue

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -99,18 +100,26 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
Resource amResource;
String partition;
if (rmApp == null || rmApp.getAMResourceRequest() == null) {
//the rmApp may be undefined (the resource manager checks for this too)
//and unmanaged applications do not provide an amResource request
//in these cases, provide a default using the scheduler
// the rmApp may be undefined (the resource manager checks for this too)
// and unmanaged applications do not provide an amResource request
// in these cases, provide a default using the scheduler
amResource = rmContext.getScheduler().getMinimumResourceCapability();
partition = CommonNodeLabelsManager.NO_LABEL;
} else {
amResource = rmApp.getAMResourceRequest().getCapability();
partition =
(rmApp.getAMResourceRequest().getNodeLabelExpression() == null)
? CommonNodeLabelsManager.NO_LABEL
: rmApp.getAMResourceRequest().getNodeLabelExpression();
}
setAMResource(amResource);
setAppAMNodePartitionName(partition);
setAMResource(partition, amResource);
setPriority(appPriority);
scheduler = rmContext.getScheduler();
@ -143,8 +152,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
containersToPreempt.remove(rmContainer.getContainerId());
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
getApplicationId(), containerId);
// Update usage metrics
@ -185,10 +194,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, priority, request, container);
attemptResourceUsage.incUsed(node.getPartition(),
container.getResource());
// Update resource requests related to "request" and store in RMContainer
attemptResourceUsage.incUsed(node.getPartition(), container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
@ -201,8 +210,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
+ " container=" + container.getId() + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), container.getId());
return rmContainer;
@ -483,5 +492,14 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
this.attemptResourceUsage.incUsed(newPartition, containerResource);
getCSLeafQueue().decUsedResource(oldPartition, containerResource, this);
getCSLeafQueue().incUsedResource(newPartition, containerResource, this);
// Update new partition name if container is AM and also update AM resource
if (rmContainer.isAMContainer()) {
setAppAMNodePartitionName(newPartition);
this.attemptResourceUsage.decAMUsed(oldPartition, containerResource);
this.attemptResourceUsage.incAMUsed(newPartition, containerResource);
getCSLeafQueue().decAMUsedResource(oldPartition, containerResource, this);
getCSLeafQueue().incAMUsedResource(newPartition, containerResource, this);
}
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@ -354,7 +355,19 @@ public class MockRM extends ResourceManager {
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, String queue, String amLabel)
throws Exception {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory);
Priority priority = Priority.newInstance(0);
return submitApp(resource, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, 0, null, true, priority, amLabel);
}
public RMApp submitApp(Resource resource, String name, String user,
Map<ApplicationAccessType, String> acls, String queue) throws Exception {
return submitApp(resource, name, user, acls, false, queue,
@ -449,7 +462,20 @@ public class MockRM extends ResourceManager {
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext,
boolean cancelTokensWhenComplete, Priority priority)
boolean cancelTokensWhenComplete, Priority priority) throws Exception {
return submitApp(capability, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
isAppIdProvided, applicationId, attemptFailuresValidityInterval,
logAggregationContext, cancelTokensWhenComplete, priority, "");
}
public RMApp submitApp(Resource capability, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext,
boolean cancelTokensWhenComplete, Priority priority, String amLabel)
throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
@ -492,6 +518,12 @@ public class MockRM extends ResourceManager {
sub.setLogAggregationContext(logAggregationContext);
}
sub.setCancelTokensWhenComplete(cancelTokensWhenComplete);
if (amLabel != null && !amLabel.isEmpty()) {
ResourceRequest amResourceRequest = ResourceRequest.newInstance(
Priority.newInstance(0), ResourceRequest.ANY, capability, 1);
amResourceRequest.setNodeLabelExpression(amLabel.trim());
sub.setAMContainerResourceRequest(amResourceRequest);
}
req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser =
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@ -154,6 +155,10 @@ public class TestApplicationLimits {
doReturn(user).when(application).getUser();
doReturn(amResource).when(application).getAMResource();
doReturn(Priority.newInstance(0)).when(application).getPriority();
doReturn(CommonNodeLabelsManager.NO_LABEL).when(application)
.getAppAMNodePartitionName();
doReturn(amResource).when(application).getAMResource(
CommonNodeLabelsManager.NO_LABEL);
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod();
return application;
}

View File

@ -0,0 +1,511 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public class TestApplicationLimitsByPartition {
final static int GB = 1024;
LeafQueue queue;
RMNodeLabelsManager mgr;
private YarnConfiguration conf;
RMContext rmContext = null;
@Before
public void setUp() throws IOException {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
private void simpleNodeLabelMappingToManager() throws IOException {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
TestUtils.toSet("x"), NodeId.newInstance("h2", 0),
TestUtils.toSet("y")));
}
private void complexNodeLabelMappingToManager() throws IOException {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y",
"z"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
TestUtils.toSet("x"), NodeId.newInstance("h2", 0),
TestUtils.toSet("y"), NodeId.newInstance("h3", 0),
TestUtils.toSet("y"), NodeId.newInstance("h4", 0),
TestUtils.toSet("z"), NodeId.newInstance("h5", 0),
RMNodeLabelsManager.EMPTY_STRING_SET));
}
@Test(timeout = 120000)
public void testAMResourceLimitWithLabels() throws Exception {
/*
* Test Case:
* Verify AM resource limit per partition level and per queue level. So
* we use 2 queues to verify this case.
* Queue a1 supports labels (x,y). Configure am-resource-limit as 0.2 (x)
* Queue c1 supports default label. Configure am-resource-limit as 0.2
*
* Queue A1 for label X can only support 2Gb AM resource.
* Queue C1 (empty label) can support 2Gb AM resource.
*
* Verify atleast one AM is launched, and AM resources should not go more
* than 2GB in each queue.
*/
simpleNodeLabelMappingToManager();
CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration)
TestUtils.getConfigurationWithQueueLabels(conf);
// After getting queue conf, configure AM resource percent for Queue A1
// as 0.2 (Label X) and for Queue C1 as 0.2 (Empty Label)
final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1";
final String C1 = CapacitySchedulerConfiguration.ROOT + ".c" + ".c1";
config.setMaximumAMResourcePercentPerPartition(A1, "x", 0.2f);
config.setMaximumApplicationMasterResourcePerQueuePercent(C1, 0.2f);
// Now inject node label manager with this updated config
MockRM rm1 = new MockRM(config) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
rm1.registerNode("h2:1234", 10 * GB); // label = y
MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); // label = <empty>
// Submit app1 with 1Gb AM resource to Queue A1 for label X
RMApp app1 = rm1.submitApp(GB, "app", "user", null, "a1", "x");
MockRM.launchAndRegisterAM(app1, rm1, nm1);
// Submit app2 with 1Gb AM resource to Queue A1 for label X
RMApp app2 = rm1.submitApp(GB, "app", "user", null, "a1", "x");
MockRM.launchAndRegisterAM(app2, rm1, nm1);
// Submit 3rd app to Queue A1 for label X, and this will be pending as
// AM limit is already crossed for label X. (2GB)
rm1.submitApp(GB, "app", "user", null, "a1", "x");
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
Assert.assertNotNull(leafQueue);
// Only one AM will be activated here and second AM will be still
// pending.
Assert.assertEquals(2, leafQueue.getNumActiveApplications());
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
// Now verify the same test case in Queue C1 where label is not configured.
// Submit an app to Queue C1 with empty label
RMApp app3 = rm1.submitApp(GB, "app", "user", null, "c1");
MockRM.launchAndRegisterAM(app3, rm1, nm3);
// Submit next app to Queue C1 with empty label
RMApp app4 = rm1.submitApp(GB, "app", "user", null, "c1");
MockRM.launchAndRegisterAM(app4, rm1, nm3);
// Submit 3rd app to Queue C1. This will be pending as Queue's am-limit
// is reached.
rm1.submitApp(GB, "app", "user", null, "c1");
leafQueue = (LeafQueue) cs.getQueue("c1");
Assert.assertNotNull(leafQueue);
// 2 apps will be activated, third one will be pending as am-limit
// is reached.
Assert.assertEquals(2, leafQueue.getNumActiveApplications());
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
rm1.killApp(app3.getApplicationId());
Thread.sleep(1000);
// After killing one running app, pending app will also get activated.
Assert.assertEquals(2, leafQueue.getNumActiveApplications());
Assert.assertEquals(0, leafQueue.getNumPendingApplications());
rm1.close();
}
@Test(timeout = 120000)
public void testAtleastOneAMRunPerPartition() throws Exception {
/*
* Test Case:
* Even though am-resource-limit per queue/partition may cross if we
* activate an app (high am resource demand), we have to activate it
* since no other apps are running in that Queue/Partition. Here also
* we run one test case for partition level and one in queue level to
* ensure no breakage in existing functionality.
*
* Queue a1 supports labels (x,y). Configure am-resource-limit as 0.15 (x)
* Queue c1 supports default label. Configure am-resource-limit as 0.15
*
* Queue A1 for label X can only support 1.5Gb AM resource.
* Queue C1 (empty label) can support 1.5Gb AM resource.
*
* Verify atleast one AM is launched in each Queue.
*/
simpleNodeLabelMappingToManager();
CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration)
TestUtils.getConfigurationWithQueueLabels(conf);
// After getting queue conf, configure AM resource percent for Queue A1
// as 0.15 (Label X) and for Queue C1 as 0.15 (Empty Label)
final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1";
final String C1 = CapacitySchedulerConfiguration.ROOT + ".c" + ".c1";
config.setMaximumAMResourcePercentPerPartition(A1, "x", 0.15f);
config.setMaximumApplicationMasterResourcePerQueuePercent(C1, 0.15f);
// inject node label manager
MockRM rm1 = new MockRM(config) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
rm1.registerNode("h2:1234", 10 * GB); // label = y
MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); // label = <empty>
// Submit app1 (2 GB) to Queue A1 and label X
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a1", "x");
// This app must be activated eventhough the am-resource per-partition
// limit is only for 1.5GB.
MockRM.launchAndRegisterAM(app1, rm1, nm1);
// Submit 2nd app to label "X" with one GB and it must be pending since
// am-resource per-partition limit is crossed (1.5 GB was the limit).
rm1.submitApp(GB, "app", "user", null, "a1", "x");
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
Assert.assertNotNull(leafQueue);
// Only 1 app will be activated as am-limit for partition "x" is 0.15
Assert.assertEquals(1, leafQueue.getNumActiveApplications());
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
// Now verify the same test case in Queue C1 which takes default label
// to see queue level am-resource-limit is still working as expected.
// Submit an app to Queue C1 with empty label (2 GB)
RMApp app3 = rm1.submitApp(2 * GB, "app", "user", null, "c1");
// This app must be activated even though the am-resource per-queue
// limit is only for 1.5GB
MockRM.launchAndRegisterAM(app3, rm1, nm3);
// Submit 2nd app to C1 (Default label, hence am-limit per-queue will be
// considered).
rm1.submitApp(GB, "app", "user", null, "c1");
leafQueue = (LeafQueue) cs.getQueue("c1");
Assert.assertNotNull(leafQueue);
// 1 app will be activated (and it has AM resource more than queue limit)
Assert.assertEquals(1, leafQueue.getNumActiveApplications());
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
rm1.close();
}
@Test(timeout = 120000)
public void testDefaultAMLimitFromQueueForPartition() throws Exception {
/*
* Test Case:
* Configure AM resource limit per queue level. If partition level config
* is not found, we will be considering per-queue level am-limit. Ensure
* this is working as expected.
*
* Queue A1 am-resource limit to be configured as 0.2 (not for partition x)
*
* Eventhough per-partition level config is not done, CS should consider
* the configuration done for queue level.
*/
simpleNodeLabelMappingToManager();
CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration)
TestUtils.getConfigurationWithQueueLabels(conf);
// After getting queue conf, configure AM resource percent for Queue A1
// as 0.2 (not for partition, rather in queue level)
final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1";
config.setMaximumApplicationMasterResourcePerQueuePercent(A1, 0.2f);
// inject node label manager
MockRM rm1 = new MockRM(config) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
rm1.registerNode("h2:1234", 10 * GB); // label = y
rm1.registerNode("h3:1234", 10 * GB); // label = <empty>
// Submit app1 (2 GB) to Queue A1 and label X
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a1", "x");
MockRM.launchAndRegisterAM(app1, rm1, nm1);
// Submit 2nd app to label "X" with one GB. Since queue am-limit is 2GB,
// 2nd app will be pending and first one will get activated.
rm1.submitApp(GB, "app", "user", null, "a1", "x");
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
Assert.assertNotNull(leafQueue);
// Only 1 app will be activated as am-limit for queue is 0.2 and same is
// used for partition "x" also.
Assert.assertEquals(1, leafQueue.getNumActiveApplications());
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
rm1.close();
}
@Test(timeout = 120000)
public void testUserAMResourceLimitWithLabels() throws Exception {
/*
* Test Case:
* Verify user level AM resource limit. This test case is ran with two
* users. And per-partition level am-resource-limit will be 0.4, which
* internally will be 4GB. Hence 2GB will be available for each
* user for its AM resource.
*
* Now this test case will create a scenario where AM resource limit per
* partition is not met, but user level am-resource limit is reached.
* Hence app will be pending.
*/
final String user_0 = "user_0";
final String user_1 = "user_1";
simpleNodeLabelMappingToManager();
CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration)
TestUtils.getConfigurationWithQueueLabels(conf);
// After getting queue conf, configure AM resource percent for Queue A1
// as 0.4 (Label X). Also set userlimit as 50% for this queue. So when we
// have two users submitting applications, each user will get 50% of AM
// resource which is available in this partition.
final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1";
config.setMaximumAMResourcePercentPerPartition(A1, "x", 0.4f);
config.setUserLimit(A1, 50);
// Now inject node label manager with this updated config
MockRM rm1 = new MockRM(config) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
rm1.registerNode("h2:1234", 10 * GB); // label = y
rm1.registerNode("h3:1234", 10 * GB); // label = <empty>
// Submit app1 with 1Gb AM resource to Queue A1 for label X for user0
RMApp app1 = rm1.submitApp(GB, "app", user_0, null, "a1", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// Place few allocate requests to make it an active application
am1.allocate("*", 1 * GB, 15, new ArrayList<ContainerId>(), "");
// Now submit 2nd app to Queue A1 for label X for user1
RMApp app2 = rm1.submitApp(GB, "app", user_1, null, "a1", "x");
MockRM.launchAndRegisterAM(app2, rm1, nm1);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
Assert.assertNotNull(leafQueue);
// Verify active applications count in this queue.
Assert.assertEquals(2, leafQueue.getNumActiveApplications());
Assert.assertEquals(1, leafQueue.getNumActiveApplications(user_0));
Assert.assertEquals(0, leafQueue.getNumPendingApplications());
// Submit 3rd app to Queue A1 for label X for user1. Now user1 will have
// 2 applications (2 GB resource) and user0 will have one app (1GB).
RMApp app3 = rm1.submitApp(GB, "app", user_1, null, "a1", "x");
MockAM am2 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
// Place few allocate requests to make it an active application. This is
// to ensure that user1 and user0 are active users.
am2.allocate("*", 1 * GB, 10, new ArrayList<ContainerId>(), "");
// Submit final app to Queue A1 for label X. Since we are trying to submit
// for user1, we need 3Gb resource for AMs.
// 4Gb -> 40% of label "X" in queue A1
// Since we have 2 users, 50% of 4Gb will be max for each user. Here user1
// has already crossed this 2GB limit, hence this app will be pending.
rm1.submitApp(GB, "app", user_1, null, "a1", "x");
// Verify active applications count per user and also in queue level.
Assert.assertEquals(3, leafQueue.getNumActiveApplications());
Assert.assertEquals(1, leafQueue.getNumActiveApplications(user_0));
Assert.assertEquals(2, leafQueue.getNumActiveApplications(user_1));
Assert.assertEquals(1, leafQueue.getNumPendingApplications(user_1));
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
rm1.close();
}
@Test
public void testAMResourceLimitForMultipleApplications() throws Exception {
/*
* Test Case:
* In a complex node label setup, verify am-resource-percentage calculation
* and check whether applications can get activated as per expectation.
*/
complexNodeLabelMappingToManager();
CapacitySchedulerConfiguration config = (CapacitySchedulerConfiguration)
TestUtils.getComplexConfigurationWithQueueLabels(conf);
/*
* Queue structure:
* root (*)
* ________________
* / \
* a x(100%), y(50%) b y(50%), z(100%)
* ________________ ______________
* / / \
* a1 (x,y) b1(no) b2(y,z)
* 100% y = 100%, z = 100%
*
* Node structure:
* h1 : x
* h2 : y
* h3 : y
* h4 : z
* h5 : NO
*
* Total resource:
* x: 10G
* y: 20G
* z: 10G
* *: 10G
*
* AM resource percentage config:
* A1 : 0.25
* B2 : 0.15
*/
final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1";
final String B1 = CapacitySchedulerConfiguration.ROOT + ".b" + ".b1";
config.setMaximumAMResourcePercentPerPartition(A1, "y", 0.25f);
config.setMaximumApplicationMasterResourcePerQueuePercent(B1, 0.15f);
// Now inject node label manager with this updated config
MockRM rm1 = new MockRM(config) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y
MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); // label = y
rm1.registerNode("h4:1234", 10 * GB); // label = z
MockNM nm5 = rm1.registerNode("h5:1234", 10 * GB); // label = <empty>
// Submit app1 with 2Gb AM resource to Queue A1 for label Y
RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "a1", "y");
MockRM.launchAndRegisterAM(app1, rm1, nm2);
// Submit app2 with 1Gb AM resource to Queue A1 for label Y
RMApp app2 = rm1.submitApp(GB, "app", "user", null, "a1", "y");
MockRM.launchAndRegisterAM(app2, rm1, nm3);
// Submit another app with 1Gb AM resource to Queue A1 for label Y
rm1.submitApp(GB, "app", "user", null, "a1", "y");
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
Assert.assertNotNull(leafQueue);
/*
* capacity of queue A -> 50% for label Y
* capacity of queue A1 -> 100% for label Y
*
* Total resources available for label Y -> 20GB (nm2 and nm3)
* Hence in queue A1, max resource for label Y is 10GB.
*
* AM resource percent config for queue A1 -> 0.25
* ==> 2.5Gb (3 Gb) is max-am-resource-limit
*/
Assert.assertEquals(2, leafQueue.getNumActiveApplications());
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
// Submit app3 with 1Gb AM resource to Queue B1 (no_label)
RMApp app3 = rm1.submitApp(GB, "app", "user", null, "b1");
MockRM.launchAndRegisterAM(app3, rm1, nm5);
// Submit another app with 1Gb AM resource to Queue B1 (no_label)
rm1.submitApp(GB, "app", "user", null, "b1");
leafQueue = (LeafQueue) cs.getQueue("b1");
Assert.assertNotNull(leafQueue);
/*
* capacity of queue B -> 90% for queue
* -> and 100% for no-label
* capacity of queue B1 -> 50% for no-label/queue
*
* Total resources available for no-label -> 10GB (nm5)
* Hence in queue B1, max resource for no-label is 5GB.
*
* AM resource percent config for queue B1 -> 0.15
* ==> 1Gb is max-am-resource-limit
*
* Only one app will be activated and all othe will be pending.
*/
Assert.assertEquals(1, leafQueue.getNumActiveApplications());
Assert.assertEquals(1, leafQueue.getNumPendingApplications());
rm1.close();
}
}

View File

@ -95,7 +95,11 @@ public class TestCapacitySchedulerNodeLabelUpdate {
private void checkUsedResource(MockRM rm, String queueName, int memory) {
checkUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
}
private void checkAMUsedResource(MockRM rm, String queueName, int memory) {
checkAMUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
}
private void checkUsedResource(MockRM rm, String queueName, int memory,
String label) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
@ -104,6 +108,14 @@ public class TestCapacitySchedulerNodeLabelUpdate {
.getMemory());
}
private void checkAMUsedResource(MockRM rm, String queueName, int memory,
String label) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = scheduler.getQueue(queueName);
Assert.assertEquals(memory, queue.getQueueResourceUsage().getAMUsed(label)
.getMemory());
}
private void checkUserUsedResource(MockRM rm, String queueName,
String userName, String partition, int memory) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
@ -424,4 +436,93 @@ public class TestCapacitySchedulerNodeLabelUpdate {
rm.close();
}
@Test (timeout = 60000)
public void testAMResourceUsageWhenNodeUpdatesPartition()
throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
// set mapping:
// h1 -> x
// h2 -> y
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 8000);
rm.registerNode("h2:1234", 8000);
rm.registerNode("h3:1234", 8000);
ContainerId containerId2;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
RMApp app1 = rm.submitApp(GB, "app", "user", null, "a", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// request a container.
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertTrue(rm.waitForState(nm1, containerId2,
RMContainerState.ALLOCATED, 10 * 1000));
// check used resource:
// queue-a used x=2G
checkUsedResource(rm, "a", 2048, "x");
checkAMUsedResource(rm, "a", 1024, "x");
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
// change h1's label to z
cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
toSet("z"))));
// Now the resources also should change from x to z. Verify AM and normal
// used resource are successfully changed.
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 2048, "z");
checkAMUsedResource(rm, "a", 0, "x");
checkAMUsedResource(rm, "a", 1024, "z");
checkUserUsedResource(rm, "a", "user", "x", 0);
checkUserUsedResource(rm, "a", "user", "z", 2048);
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getAMUsed("x").getMemory());
Assert.assertEquals(1024,
app.getAppAttemptResourceUsage().getAMUsed("z").getMemory());
// change h1's label to no label
Set<String> emptyLabels = new HashSet<>();
Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(),
emptyLabels);
cs.handle(new NodeLabelsUpdateSchedulerEvent(map));
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "z");
checkUsedResource(rm, "a", 2048);
checkAMUsedResource(rm, "a", 0, "x");
checkAMUsedResource(rm, "a", 0, "z");
checkAMUsedResource(rm, "a", 1024);
checkUserUsedResource(rm, "a", "user", "x", 0);
checkUserUsedResource(rm, "a", "user", "z", 0);
checkUserUsedResource(rm, "a", "user", "", 2048);
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getAMUsed("x").getMemory());
Assert.assertEquals(0,
app.getAppAttemptResourceUsage().getAMUsed("z").getMemory());
Assert.assertEquals(1024,
app.getAppAttemptResourceUsage().getAMUsed("").getMemory());
rm.close();
}
}

View File

@ -221,13 +221,13 @@ public class TestUtils {
when(container.getPriority()).thenReturn(priority);
return container;
}
@SuppressWarnings("unchecked")
private static <E> Set<E> toSet(E... elements) {
public static <E> Set<E> toSet(E... elements) {
Set<E> set = Sets.newHashSet(elements);
return set;
}
/**
* Get a queue structure:
* <pre>