YARN-3099. Capacity Scheduler LeafQueue/ParentQueue should use ResourceUsage to track used-resources-by-label. Contributed by Wangda Tan

(cherry picked from commit 86358221fc)

(cherry picked from commit cabf97ae4f)
This commit is contained in:
Jian He 2015-01-30 15:15:20 -08:00 committed by Vinod Kumar Vavilapalli
parent b0ad553841
commit d9281fbbab
6 changed files with 93 additions and 137 deletions

View File

@ -180,6 +180,9 @@ Release 2.6.1 - UNRELEASED
YARN-2978. Fixed potential NPE while getting queue info. (Varun Saxena via YARN-2978. Fixed potential NPE while getting queue info. (Varun Saxena via
jianhe) jianhe)
YARN-3099. Capacity Scheduler LeafQueue/ParentQueue should use ResourceUsage
to track used-resources-by-label.(Wangda Tan via jianhe)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -35,10 +35,11 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
public abstract class AbstractCSQueue implements CSQueue { public abstract class AbstractCSQueue implements CSQueue {
@ -64,16 +65,17 @@ public abstract class AbstractCSQueue implements CSQueue {
Set<String> accessibleLabels; Set<String> accessibleLabels;
RMNodeLabelsManager labelManager; RMNodeLabelsManager labelManager;
String defaultLabelExpression; String defaultLabelExpression;
Resource usedResources = Resources.createResource(0, 0);
Map<String, Float> absoluteCapacityByNodeLabels; Map<String, Float> absoluteCapacityByNodeLabels;
Map<String, Float> capacitiyByNodeLabels; Map<String, Float> capacitiyByNodeLabels;
Map<String, Resource> usedResourcesByNodeLabels = new HashMap<String, Resource>();
Map<String, Float> absoluteMaxCapacityByNodeLabels; Map<String, Float> absoluteMaxCapacityByNodeLabels;
Map<String, Float> maxCapacityByNodeLabels; Map<String, Float> maxCapacityByNodeLabels;
Map<QueueACL, AccessControlList> acls = Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>(); new HashMap<QueueACL, AccessControlList>();
boolean reservationsContinueLooking; boolean reservationsContinueLooking;
// Track resource usage-by-label like used-resource/pending-resource, etc.
ResourceUsage queueUsage;
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
@ -120,6 +122,7 @@ public abstract class AbstractCSQueue implements CSQueue {
maxCapacityByNodeLabels = maxCapacityByNodeLabels =
cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(), cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(),
accessibleLabels, labelManager); accessibleLabels, labelManager);
queueUsage = new ResourceUsage();
} }
@Override @Override
@ -153,8 +156,8 @@ public abstract class AbstractCSQueue implements CSQueue {
} }
@Override @Override
public synchronized Resource getUsedResources() { public Resource getUsedResources() {
return usedResources; return queueUsage.getUsed();
} }
public synchronized int getNumContainers() { public synchronized int getNumContainers() {
@ -344,22 +347,13 @@ public abstract class AbstractCSQueue implements CSQueue {
synchronized void allocateResource(Resource clusterResource, synchronized void allocateResource(Resource clusterResource,
Resource resource, Set<String> nodeLabels) { Resource resource, Set<String> nodeLabels) {
Resources.addTo(usedResources, resource);
// Update usedResources by labels // Update usedResources by labels
if (nodeLabels == null || nodeLabels.isEmpty()) { if (nodeLabels == null || nodeLabels.isEmpty()) {
if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) { queueUsage.incUsed(resource);
usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(0));
}
Resources.addTo(usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL),
resource);
} else { } else {
for (String label : Sets.intersection(accessibleLabels, nodeLabels)) { for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
if (!usedResourcesByNodeLabels.containsKey(label)) { queueUsage.incUsed(label, resource);
usedResourcesByNodeLabels.put(label, Resources.createResource(0));
}
Resources.addTo(usedResourcesByNodeLabels.get(label), resource);
} }
} }
@ -370,23 +364,12 @@ public abstract class AbstractCSQueue implements CSQueue {
protected synchronized void releaseResource(Resource clusterResource, protected synchronized void releaseResource(Resource clusterResource,
Resource resource, Set<String> nodeLabels) { Resource resource, Set<String> nodeLabels) {
// Update queue metrics
Resources.subtractFrom(usedResources, resource);
// Update usedResources by labels // Update usedResources by labels
if (null == nodeLabels || nodeLabels.isEmpty()) { if (null == nodeLabels || nodeLabels.isEmpty()) {
if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) { queueUsage.decUsed(resource);
usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(0));
}
Resources.subtractFrom(
usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL), resource);
} else { } else {
for (String label : Sets.intersection(accessibleLabels, nodeLabels)) { for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
if (!usedResourcesByNodeLabels.containsKey(label)) { queueUsage.decUsed(label, resource);
usedResourcesByNodeLabels.put(label, Resources.createResource(0));
}
Resources.subtractFrom(usedResourcesByNodeLabels.get(label), resource);
} }
} }
@ -452,6 +435,11 @@ public abstract class AbstractCSQueue implements CSQueue {
@Private @Private
public Resource getUsedResourceByLabel(String nodeLabel) { public Resource getUsedResourceByLabel(String nodeLabel) {
return usedResourcesByNodeLabels.get(nodeLabel); return queueUsage.getUsed(nodeLabel);
}
@VisibleForTesting
public ResourceUsage getResourceUsage() {
return queueUsage;
} }
} }

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@ -117,10 +118,6 @@ public class LeafQueue extends AbstractCSQueue {
private volatile float absoluteMaxAvailCapacity; private volatile float absoluteMaxAvailCapacity;
// sum of resources used by application masters for applications
// running in this queue
private final Resource usedAMResources = Resource.newInstance(0, 0);
public LeafQueue(CapacitySchedulerContext cs, public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException { String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old); super(cs, queueName, parent, old);
@ -440,7 +437,7 @@ public class LeafQueue extends AbstractCSQueue {
return queueName + ": " + return queueName + ": " +
"capacity=" + capacity + ", " + "capacity=" + capacity + ", " +
"absoluteCapacity=" + absoluteCapacity + ", " + "absoluteCapacity=" + absoluteCapacity + ", " +
"usedResources=" + usedResources + ", " + "usedResources=" + queueUsage.getUsed() + ", " +
"usedCapacity=" + getUsedCapacity() + ", " + "usedCapacity=" + getUsedCapacity() + ", " +
"absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " + "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " +
"numApps=" + getNumApplications() + ", " + "numApps=" + getNumApplications() + ", " +
@ -469,7 +466,7 @@ public class LeafQueue extends AbstractCSQueue {
ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>(); ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
for (Map.Entry<String, User> entry: users.entrySet()) { for (Map.Entry<String, User> entry: users.entrySet()) {
usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone( usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone(
entry.getValue().consumed), entry.getValue().getActiveApplications(), entry.getValue().getUsed()), entry.getValue().getActiveApplications(),
entry.getValue().getPendingApplications())); entry.getValue().getPendingApplications()));
} }
return usersToReturn; return usersToReturn;
@ -638,7 +635,7 @@ public class LeafQueue extends AbstractCSQueue {
// Check am resource limit // Check am resource limit
Resource amIfStarted = Resource amIfStarted =
Resources.add(application.getAMResource(), usedAMResources); Resources.add(application.getAMResource(), queueUsage.getAMUsed());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("application AMResource " + application.getAMResource() + LOG.debug("application AMResource " + application.getAMResource() +
@ -683,9 +680,8 @@ public class LeafQueue extends AbstractCSQueue {
} }
user.activateApplication(); user.activateApplication();
activeApplications.add(application); activeApplications.add(application);
Resources.addTo(usedAMResources, application.getAMResource()); queueUsage.incAMUsed(application.getAMResource());
Resources.addTo(user.getConsumedAMResources(), user.getResourceUsage().incAMUsed(application.getAMResource());
application.getAMResource());
i.remove(); i.remove();
LOG.info("Application " + application.getApplicationId() + LOG.info("Application " + application.getApplicationId() +
" from user: " + application.getUser() + " from user: " + application.getUser() +
@ -736,9 +732,8 @@ public class LeafQueue extends AbstractCSQueue {
if (!wasActive) { if (!wasActive) {
pendingApplications.remove(application); pendingApplications.remove(application);
} else { } else {
Resources.subtractFrom(usedAMResources, application.getAMResource()); queueUsage.decAMUsed(application.getAMResource());
Resources.subtractFrom(user.getConsumedAMResources(), user.getResourceUsage().decAMUsed(application.getAMResource());
application.getAMResource());
} }
applicationAttemptMap.remove(application.getApplicationAttemptId()); applicationAttemptMap.remove(application.getApplicationAttemptId());
@ -987,8 +982,8 @@ public class LeafQueue extends AbstractCSQueue {
*/ */
Resource headroom = Resource headroom =
Resources.min(resourceCalculator, clusterResource, Resources.min(resourceCalculator, clusterResource,
Resources.subtract(userLimit, user.getTotalConsumedResources()), Resources.subtract(userLimit, user.getUsed()),
Resources.subtract(queueMaxCap, usedResources) Resources.subtract(queueMaxCap, queueUsage.getUsed())
); );
return headroom; return headroom;
} }
@ -1008,12 +1003,8 @@ public class LeafQueue extends AbstractCSQueue {
boolean canAssign = true; boolean canAssign = true;
for (String label : labelCanAccess) { for (String label : labelCanAccess) {
if (!usedResourcesByNodeLabels.containsKey(label)) {
usedResourcesByNodeLabels.put(label, Resources.createResource(0));
}
Resource potentialTotalCapacity = Resource potentialTotalCapacity =
Resources.add(usedResourcesByNodeLabels.get(label), required); Resources.add(queueUsage.getUsed(label), required);
float potentialNewCapacity = float potentialNewCapacity =
Resources.divide(resourceCalculator, clusterResource, Resources.divide(resourceCalculator, clusterResource,
@ -1036,14 +1027,14 @@ public class LeafQueue extends AbstractCSQueue {
LOG.debug("try to use reserved: " LOG.debug("try to use reserved: "
+ getQueueName() + getQueueName()
+ " usedResources: " + " usedResources: "
+ usedResources + queueUsage.getUsed()
+ " clusterResources: " + " clusterResources: "
+ clusterResource + clusterResource
+ " reservedResources: " + " reservedResources: "
+ application.getCurrentReservation() + application.getCurrentReservation()
+ " currentCapacity " + " currentCapacity "
+ Resources.divide(resourceCalculator, clusterResource, + Resources.divide(resourceCalculator, clusterResource,
usedResources, clusterResource) + " required " + required queueUsage.getUsed(), clusterResource) + " required " + required
+ " potentialNewWithoutReservedCapacity: " + " potentialNewWithoutReservedCapacity: "
+ potentialNewWithoutReservedCapacity + " ( " + " max-capacity: " + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: "
+ absoluteMaxCapacity + ")"); + absoluteMaxCapacity + ")");
@ -1063,11 +1054,11 @@ public class LeafQueue extends AbstractCSQueue {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() LOG.debug(getQueueName()
+ "Check assign to queue, label=" + label + "Check assign to queue, label=" + label
+ " usedResources: " + usedResourcesByNodeLabels.get(label) + " usedResources: " + queueUsage.getUsed(label)
+ " clusterResources: " + clusterResource + " clusterResources: " + clusterResource
+ " currentCapacity " + " currentCapacity "
+ Resources.divide(resourceCalculator, clusterResource, + Resources.divide(resourceCalculator, clusterResource,
usedResourcesByNodeLabels.get(label), queueUsage.getUsed(label),
labelManager.getResourceByLabel(label, clusterResource)) labelManager.getResourceByLabel(label, clusterResource))
+ " potentialNewCapacity: " + potentialNewCapacity + " ( " + " potentialNewCapacity: " + potentialNewCapacity + " ( "
+ " max-capacity: " + absoluteMaxCapacity + ")"); + " max-capacity: " + absoluteMaxCapacity + ")");
@ -1118,7 +1109,7 @@ public class LeafQueue extends AbstractCSQueue {
LOG.debug("Headroom calculation for user " + user + ": " + LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit + " userLimit=" + userLimit +
" queueMaxCap=" + queueMaxCap + " queueMaxCap=" + queueMaxCap +
" consumed=" + queueUser.getTotalConsumedResources() + " consumed=" + queueUser.getUsed() +
" headroom=" + headroom); " headroom=" + headroom);
} }
@ -1173,8 +1164,8 @@ public class LeafQueue extends AbstractCSQueue {
Resource currentCapacity = Resource currentCapacity =
Resources.lessThan(resourceCalculator, clusterResource, Resources.lessThan(resourceCalculator, clusterResource,
usedResources, queueCapacity) ? queueUsage.getUsed(), queueCapacity) ?
queueCapacity : Resources.add(usedResources, required); queueCapacity : Resources.add(queueUsage.getUsed(), required);
// Never allow a single user to take more than the // Never allow a single user to take more than the
// queue's configured capacity * user-limit-factor. // queue's configured capacity * user-limit-factor.
@ -1209,10 +1200,10 @@ public class LeafQueue extends AbstractCSQueue {
" userLimit=" + userLimit + " userLimit=" + userLimit +
" userLimitFactor=" + userLimitFactor + " userLimitFactor=" + userLimitFactor +
" required: " + required + " required: " + required +
" consumed: " + user.getTotalConsumedResources() + " consumed: " + user.getUsed() +
" limit: " + limit + " limit: " + limit +
" queueCapacity: " + queueCapacity + " queueCapacity: " + queueCapacity +
" qconsumed: " + usedResources + " qconsumed: " + queueUsage.getUsed() +
" currentCapacity: " + currentCapacity + " currentCapacity: " + currentCapacity +
" activeUsers: " + activeUsers + " activeUsers: " + activeUsers +
" clusterCapacity: " + clusterResource " clusterCapacity: " + clusterResource
@ -1237,7 +1228,7 @@ public class LeafQueue extends AbstractCSQueue {
// overhead of the AM, but it's a > check, not a >= check, so... // overhead of the AM, but it's a > check, not a >= check, so...
if (Resources if (Resources
.greaterThan(resourceCalculator, clusterResource, .greaterThan(resourceCalculator, clusterResource,
user.getConsumedResourceByLabel(label), user.getUsed(label),
limit)) { limit)) {
// if enabled, check to see if could we potentially use this node instead // if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers // of a reserved node if the application has reserved containers
@ -1245,13 +1236,13 @@ public class LeafQueue extends AbstractCSQueue {
if (Resources.lessThanOrEqual( if (Resources.lessThanOrEqual(
resourceCalculator, resourceCalculator,
clusterResource, clusterResource,
Resources.subtract(user.getTotalConsumedResources(), Resources.subtract(user.getUsed(),
application.getCurrentReservation()), limit)) { application.getCurrentReservation()), limit)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName() LOG.debug("User " + userName + " in queue " + getQueueName()
+ " will exceed limit based on reservations - " + " consumed: " + " will exceed limit based on reservations - " + " consumed: "
+ user.getTotalConsumedResources() + " reserved: " + user.getUsed() + " reserved: "
+ application.getCurrentReservation() + " limit: " + limit); + application.getCurrentReservation() + " limit: " + limit);
} }
return true; return true;
@ -1260,7 +1251,7 @@ public class LeafQueue extends AbstractCSQueue {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName() LOG.debug("User " + userName + " in queue " + getQueueName()
+ " will exceed limit - " + " consumed: " + " will exceed limit - " + " consumed: "
+ user.getTotalConsumedResources() + " limit: " + limit); + user.getUsed() + " limit: " + limit);
} }
return false; return false;
} }
@ -1682,7 +1673,7 @@ public class LeafQueue extends AbstractCSQueue {
" queue=" + this.toString() + " queue=" + this.toString() +
" usedCapacity=" + getUsedCapacity() + " usedCapacity=" + getUsedCapacity() +
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
" used=" + usedResources + " used=" + queueUsage.getUsed() +
" cluster=" + clusterResource); " cluster=" + clusterResource);
return request.getCapability(); return request.getCapability();
@ -1783,9 +1774,9 @@ public class LeafQueue extends AbstractCSQueue {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.info(getQueueName() + LOG.info(getQueueName() +
" user=" + userName + " user=" + userName +
" used=" + usedResources + " numContainers=" + numContainers + " used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
" headroom = " + application.getHeadroom() + " headroom = " + application.getHeadroom() +
" user-resources=" + user.getTotalConsumedResources() " user-resources=" + user.getUsed()
); );
} }
} }
@ -1801,8 +1792,8 @@ public class LeafQueue extends AbstractCSQueue {
metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
LOG.info(getQueueName() + LOG.info(getQueueName() +
" used=" + usedResources + " numContainers=" + numContainers + " used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
" user=" + userName + " user-resources=" + user.getTotalConsumedResources()); " user=" + userName + " user-resources=" + user.getUsed());
} }
private void updateAbsoluteCapacityResource(Resource clusterResource) { private void updateAbsoluteCapacityResource(Resource clusterResource) {
@ -1844,22 +1835,20 @@ public class LeafQueue extends AbstractCSQueue {
@VisibleForTesting @VisibleForTesting
public static class User { public static class User {
Resource consumed = Resources.createResource(0, 0); ResourceUsage userResourceUsage = new ResourceUsage();
Resource consumedAMResources = Resources.createResource(0, 0);
Map<String, Resource> consumedByLabel = new HashMap<String, Resource>();
int pendingApplications = 0; int pendingApplications = 0;
int activeApplications = 0; int activeApplications = 0;
public Resource getTotalConsumedResources() { public ResourceUsage getResourceUsage() {
return consumed; return userResourceUsage;
} }
public Resource getConsumedResourceByLabel(String label) { public Resource getUsed() {
Resource r = consumedByLabel.get(label); return userResourceUsage.getUsed();
if (null != r) { }
return r;
} public Resource getUsed(String label) {
return Resources.none(); return userResourceUsage.getUsed(label);
} }
public int getPendingApplications() { public int getPendingApplications() {
@ -1871,7 +1860,7 @@ public class LeafQueue extends AbstractCSQueue {
} }
public Resource getConsumedAMResources() { public Resource getConsumedAMResources() {
return consumedAMResources; return userResourceUsage.getAMUsed();
} }
public int getTotalApplications() { public int getTotalApplications() {
@ -1896,47 +1885,26 @@ public class LeafQueue extends AbstractCSQueue {
} }
} }
public synchronized void assignContainer(Resource resource, public void assignContainer(Resource resource,
Set<String> nodeLabels) { Set<String> nodeLabels) {
Resources.addTo(consumed, resource);
if (nodeLabels == null || nodeLabels.isEmpty()) { if (nodeLabels == null || nodeLabels.isEmpty()) {
if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) { userResourceUsage.incUsed(resource);
consumedByLabel.put(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(0));
}
Resources.addTo(consumedByLabel.get(RMNodeLabelsManager.NO_LABEL),
resource);
} else { } else {
for (String label : nodeLabels) { for (String label : nodeLabels) {
if (!consumedByLabel.containsKey(label)) { userResourceUsage.incUsed(label, resource);
consumedByLabel.put(label, Resources.createResource(0));
}
Resources.addTo(consumedByLabel.get(label), resource);
} }
} }
} }
public synchronized void releaseContainer(Resource resource, Set<String> nodeLabels) { public void releaseContainer(Resource resource, Set<String> nodeLabels) {
Resources.subtractFrom(consumed, resource);
// Update usedResources by labels
if (nodeLabels == null || nodeLabels.isEmpty()) { if (nodeLabels == null || nodeLabels.isEmpty()) {
if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) { userResourceUsage.decUsed(resource);
consumedByLabel.put(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(0));
}
Resources.subtractFrom(
consumedByLabel.get(RMNodeLabelsManager.NO_LABEL), resource);
} else { } else {
for (String label : nodeLabels) { for (String label : nodeLabels) {
if (!consumedByLabel.containsKey(label)) { userResourceUsage.decUsed(label, resource);
consumedByLabel.put(label, Resources.createResource(0));
}
Resources.subtractFrom(consumedByLabel.get(label), resource);
} }
} }
} }
} }
@Override @Override
@ -1995,7 +1963,7 @@ public class LeafQueue extends AbstractCSQueue {
+ " resource=" + rmContainer.getContainer().getResource() + " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+ usedResources + " cluster=" + clusterResource); + queueUsage.getUsed() + " cluster=" + clusterResource);
// Inform the parent queue // Inform the parent queue
getParent().attachContainer(clusterResource, application, rmContainer); getParent().attachContainer(clusterResource, application, rmContainer);
} }
@ -2013,7 +1981,7 @@ public class LeafQueue extends AbstractCSQueue {
+ " resource=" + rmContainer.getContainer().getResource() + " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
+ usedResources + " cluster=" + clusterResource); + queueUsage.getUsed() + " cluster=" + clusterResource);
// Inform the parent queue // Inform the parent queue
getParent().detachContainer(clusterResource, application, rmContainer); getParent().detachContainer(clusterResource, application, rmContainer);
} }

View File

@ -256,7 +256,7 @@ public class ParentQueue extends AbstractCSQueue {
"numChildQueue= " + childQueues.size() + ", " + "numChildQueue= " + childQueues.size() + ", " +
"capacity=" + capacity + ", " + "capacity=" + capacity + ", " +
"absoluteCapacity=" + absoluteCapacity + ", " + "absoluteCapacity=" + absoluteCapacity + ", " +
"usedResources=" + usedResources + "usedResources=" + queueUsage.getUsed() +
"usedCapacity=" + getUsedCapacity() + ", " + "usedCapacity=" + getUsedCapacity() + ", " +
"numApps=" + getNumApplications() + ", " + "numApps=" + getNumApplications() + ", " +
"numContainers=" + getNumContainers(); "numContainers=" + getNumContainers();
@ -463,7 +463,7 @@ public class ParentQueue extends AbstractCSQueue {
" queue=" + getQueueName() + " queue=" + getQueueName() +
" usedCapacity=" + getUsedCapacity() + " usedCapacity=" + getUsedCapacity() +
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
" used=" + usedResources + " used=" + queueUsage.getUsed() +
" cluster=" + clusterResource); " cluster=" + clusterResource);
} else { } else {
@ -506,19 +506,16 @@ public class ParentQueue extends AbstractCSQueue {
boolean canAssign = true; boolean canAssign = true;
for (String label : labelCanAccess) { for (String label : labelCanAccess) {
if (!usedResourcesByNodeLabels.containsKey(label)) {
usedResourcesByNodeLabels.put(label, Resources.createResource(0));
}
float currentAbsoluteLabelUsedCapacity = float currentAbsoluteLabelUsedCapacity =
Resources.divide(resourceCalculator, clusterResource, Resources.divide(resourceCalculator, clusterResource,
usedResourcesByNodeLabels.get(label), queueUsage.getUsed(label),
labelManager.getResourceByLabel(label, clusterResource)); labelManager.getResourceByLabel(label, clusterResource));
// if any of the label doesn't beyond limit, we can allocate on this node // if any of the label doesn't beyond limit, we can allocate on this node
if (currentAbsoluteLabelUsedCapacity >= if (currentAbsoluteLabelUsedCapacity >=
getAbsoluteMaximumCapacityByNodeLabel(label)) { getAbsoluteMaximumCapacityByNodeLabel(label)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() + " used=" + usedResources LOG.debug(getQueueName() + " used=" + queueUsage.getUsed()
+ " current-capacity (" + usedResourcesByNodeLabels.get(label) + ") " + " current-capacity (" + queueUsage.getUsed(label) + ") "
+ " >= max-capacity (" + " >= max-capacity ("
+ labelManager.getResourceByLabel(label, clusterResource) + ")"); + labelManager.getResourceByLabel(label, clusterResource) + ")");
} }
@ -540,16 +537,16 @@ public class ParentQueue extends AbstractCSQueue {
.getReservedMB(), getMetrics().getReservedVirtualCores()); .getReservedMB(), getMetrics().getReservedVirtualCores());
float capacityWithoutReservedCapacity = Resources.divide( float capacityWithoutReservedCapacity = Resources.divide(
resourceCalculator, clusterResource, resourceCalculator, clusterResource,
Resources.subtract(usedResources, reservedResources), Resources.subtract(queueUsage.getUsed(), reservedResources),
clusterResource); clusterResource);
if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) { if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("parent: try to use reserved: " + getQueueName() LOG.debug("parent: try to use reserved: " + getQueueName()
+ " usedResources: " + usedResources.getMemory() + " usedResources: " + queueUsage.getUsed().getMemory()
+ " clusterResources: " + clusterResource.getMemory() + " clusterResources: " + clusterResource.getMemory()
+ " reservedResources: " + reservedResources.getMemory() + " reservedResources: " + reservedResources.getMemory()
+ " currentCapacity " + ((float) usedResources.getMemory()) + " currentCapacity " + ((float) queueUsage.getUsed().getMemory())
/ clusterResource.getMemory() / clusterResource.getMemory()
+ " potentialNewWithoutReservedCapacity: " + " potentialNewWithoutReservedCapacity: "
+ capacityWithoutReservedCapacity + " ( " + " max-capacity: " + capacityWithoutReservedCapacity + " ( " + " max-capacity: "
@ -645,7 +642,7 @@ public class ParentQueue extends AbstractCSQueue {
" queue=" + getQueueName() + " queue=" + getQueueName() +
" usedCapacity=" + getUsedCapacity() + " usedCapacity=" + getUsedCapacity() +
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
" used=" + usedResources + " used=" + queueUsage.getUsed() +
" cluster=" + clusterResource); " cluster=" + clusterResource);
} }
@ -735,7 +732,7 @@ public class ParentQueue extends AbstractCSQueue {
.getResource(), node.getLabels()); .getResource(), node.getLabels());
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
+ clusterResource); + clusterResource);
// Inform the parent // Inform the parent
if (parent != null) { if (parent != null) {
@ -755,7 +752,7 @@ public class ParentQueue extends AbstractCSQueue {
node.getLabels()); node.getLabels());
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
+ clusterResource); + clusterResource);
// Inform the parent // Inform the parent
if (parent != null) { if (parent != null) {

View File

@ -298,7 +298,7 @@ public class TestWorkPreservingRMRestart {
1e-8); 1e-8);
// assert user consumed resources. // assert user consumed resources.
assertEquals(usedResource, leafQueue.getUser(app.getUser()) assertEquals(usedResource, leafQueue.getUser(app.getUser())
.getTotalConsumedResources()); .getUsed());
} }
private void checkFifoQueue(SchedulerApplication schedulerApp, private void checkFifoQueue(SchedulerApplication schedulerApp,

View File

@ -202,33 +202,33 @@ public class TestCSQueueUtils {
LOG.info("t2 l2q2 " + result); LOG.info("t2 l2q2 " + result);
//some usage, but below the base capacity //some usage, but below the base capacity
Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
Resources.addTo(l1q2.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity( result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2); resourceCalculator, clusterResource, l2q2);
assertEquals( 0.4f, result, 0.000001f); assertEquals( 0.4f, result, 0.000001f);
LOG.info("t2 l2q2 " + result); LOG.info("t2 l2q2 " + result);
//usage gt base on parent sibling //usage gt base on parent sibling
Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.3f)); root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
Resources.addTo(l1q2.getUsedResources(), Resources.multiply(clusterResource, 0.3f)); l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity( result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2); resourceCalculator, clusterResource, l2q2);
assertEquals( 0.3f, result, 0.000001f); assertEquals( 0.3f, result, 0.000001f);
LOG.info("t2 l2q2 " + result); LOG.info("t2 l2q2 " + result);
//same as last, but with usage also on direct parent //same as last, but with usage also on direct parent
Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity( result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2); resourceCalculator, clusterResource, l2q2);
assertEquals( 0.3f, result, 0.000001f); assertEquals( 0.3f, result, 0.000001f);
LOG.info("t2 l2q2 " + result); LOG.info("t2 l2q2 " + result);
//add to direct sibling, below the threshold of effect at present //add to direct sibling, below the threshold of effect at present
Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
Resources.addTo(l2q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity( result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2); resourceCalculator, clusterResource, l2q2);
assertEquals( 0.3f, result, 0.000001f); assertEquals( 0.3f, result, 0.000001f);
@ -236,9 +236,9 @@ public class TestCSQueueUtils {
//add to direct sibling, now above the threshold of effect //add to direct sibling, now above the threshold of effect
//(it's cumulative with prior tests) //(it's cumulative with prior tests)
Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
Resources.addTo(l2q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity( result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2); resourceCalculator, clusterResource, l2q2);
assertEquals( 0.1f, result, 0.000001f); assertEquals( 0.1f, result, 0.000001f);