From d9281fbbab4a2e0475789e724f21faa52a465f62 Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 30 Jan 2015 15:15:20 -0800 Subject: [PATCH] YARN-3099. Capacity Scheduler LeafQueue/ParentQueue should use ResourceUsage to track used-resources-by-label. Contributed by Wangda Tan (cherry picked from commit 86358221fc85a7743052a0b4c1647353508bf308) (cherry picked from commit cabf97ae4f2dad53c7b9e3d10a67876b16d94074) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/AbstractCSQueue.java | 48 +++---- .../scheduler/capacity/LeafQueue.java | 128 +++++++----------- .../scheduler/capacity/ParentQueue.java | 25 ++-- .../TestWorkPreservingRMRestart.java | 2 +- .../scheduler/capacity/TestCSQueueUtils.java | 24 ++-- 6 files changed, 93 insertions(+), 137 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 975e4e1c007..bac13ff7c23 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -180,6 +180,9 @@ Release 2.6.1 - UNRELEASED YARN-2978. Fixed potential NPE while getting queue info. (Varun Saxena via jianhe) + YARN-3099. Capacity Scheduler LeafQueue/ParentQueue should use ResourceUsage + to track used-resources-by-label.(Wangda Tan via jianhe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index fec3a567744..3c1663fbb6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -35,10 +35,11 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { @@ -64,16 +65,17 @@ public abstract class AbstractCSQueue implements CSQueue { Set accessibleLabels; RMNodeLabelsManager labelManager; String defaultLabelExpression; - Resource usedResources = Resources.createResource(0, 0); Map absoluteCapacityByNodeLabels; Map capacitiyByNodeLabels; - Map usedResourcesByNodeLabels = new HashMap(); Map absoluteMaxCapacityByNodeLabels; Map maxCapacityByNodeLabels; Map acls = new HashMap(); boolean reservationsContinueLooking; + + // Track resource usage-by-label like used-resource/pending-resource, etc. + ResourceUsage queueUsage; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -120,6 +122,7 @@ public abstract class AbstractCSQueue implements CSQueue { maxCapacityByNodeLabels = cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(), accessibleLabels, labelManager); + queueUsage = new ResourceUsage(); } @Override @@ -153,8 +156,8 @@ public abstract class AbstractCSQueue implements CSQueue { } @Override - public synchronized Resource getUsedResources() { - return usedResources; + public Resource getUsedResources() { + return queueUsage.getUsed(); } public synchronized int getNumContainers() { @@ -344,22 +347,13 @@ public abstract class AbstractCSQueue implements CSQueue { synchronized void allocateResource(Resource clusterResource, Resource resource, Set nodeLabels) { - Resources.addTo(usedResources, resource); // Update usedResources by labels if (nodeLabels == null || nodeLabels.isEmpty()) { - if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) { - usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL, - Resources.createResource(0)); - } - Resources.addTo(usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL), - resource); + queueUsage.incUsed(resource); } else { for (String label : Sets.intersection(accessibleLabels, nodeLabels)) { - if (!usedResourcesByNodeLabels.containsKey(label)) { - usedResourcesByNodeLabels.put(label, Resources.createResource(0)); - } - Resources.addTo(usedResourcesByNodeLabels.get(label), resource); + queueUsage.incUsed(label, resource); } } @@ -370,23 +364,12 @@ public abstract class AbstractCSQueue implements CSQueue { protected synchronized void releaseResource(Resource clusterResource, Resource resource, Set nodeLabels) { - // Update queue metrics - Resources.subtractFrom(usedResources, resource); - // Update usedResources by labels if (null == nodeLabels || nodeLabels.isEmpty()) { - if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) { - usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL, - Resources.createResource(0)); - } - Resources.subtractFrom( - usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL), resource); + queueUsage.decUsed(resource); } else { for (String label : Sets.intersection(accessibleLabels, nodeLabels)) { - if (!usedResourcesByNodeLabels.containsKey(label)) { - usedResourcesByNodeLabels.put(label, Resources.createResource(0)); - } - Resources.subtractFrom(usedResourcesByNodeLabels.get(label), resource); + queueUsage.decUsed(label, resource); } } @@ -452,6 +435,11 @@ public abstract class AbstractCSQueue implements CSQueue { @Private public Resource getUsedResourceByLabel(String nodeLabel) { - return usedResourcesByNodeLabels.get(nodeLabel); + return queueUsage.getUsed(nodeLabel); + } + + @VisibleForTesting + public ResourceUsage getResourceUsage() { + return queueUsage; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 78b5fb6d652..6fbc8e45d44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -117,10 +118,6 @@ public class LeafQueue extends AbstractCSQueue { private volatile float absoluteMaxAvailCapacity; - // sum of resources used by application masters for applications - // running in this queue - private final Resource usedAMResources = Resource.newInstance(0, 0); - public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -440,7 +437,7 @@ public class LeafQueue extends AbstractCSQueue { return queueName + ": " + "capacity=" + capacity + ", " + "absoluteCapacity=" + absoluteCapacity + ", " + - "usedResources=" + usedResources + ", " + + "usedResources=" + queueUsage.getUsed() + ", " + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications() + ", " + @@ -469,7 +466,7 @@ public class LeafQueue extends AbstractCSQueue { ArrayList usersToReturn = new ArrayList(); for (Map.Entry entry: users.entrySet()) { usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone( - entry.getValue().consumed), entry.getValue().getActiveApplications(), + entry.getValue().getUsed()), entry.getValue().getActiveApplications(), entry.getValue().getPendingApplications())); } return usersToReturn; @@ -638,7 +635,7 @@ public class LeafQueue extends AbstractCSQueue { // Check am resource limit Resource amIfStarted = - Resources.add(application.getAMResource(), usedAMResources); + Resources.add(application.getAMResource(), queueUsage.getAMUsed()); if (LOG.isDebugEnabled()) { LOG.debug("application AMResource " + application.getAMResource() + @@ -683,9 +680,8 @@ public class LeafQueue extends AbstractCSQueue { } user.activateApplication(); activeApplications.add(application); - Resources.addTo(usedAMResources, application.getAMResource()); - Resources.addTo(user.getConsumedAMResources(), - application.getAMResource()); + queueUsage.incAMUsed(application.getAMResource()); + user.getResourceUsage().incAMUsed(application.getAMResource()); i.remove(); LOG.info("Application " + application.getApplicationId() + " from user: " + application.getUser() + @@ -736,9 +732,8 @@ public class LeafQueue extends AbstractCSQueue { if (!wasActive) { pendingApplications.remove(application); } else { - Resources.subtractFrom(usedAMResources, application.getAMResource()); - Resources.subtractFrom(user.getConsumedAMResources(), - application.getAMResource()); + queueUsage.decAMUsed(application.getAMResource()); + user.getResourceUsage().decAMUsed(application.getAMResource()); } applicationAttemptMap.remove(application.getApplicationAttemptId()); @@ -987,8 +982,8 @@ public class LeafQueue extends AbstractCSQueue { */ Resource headroom = Resources.min(resourceCalculator, clusterResource, - Resources.subtract(userLimit, user.getTotalConsumedResources()), - Resources.subtract(queueMaxCap, usedResources) + Resources.subtract(userLimit, user.getUsed()), + Resources.subtract(queueMaxCap, queueUsage.getUsed()) ); return headroom; } @@ -1008,12 +1003,8 @@ public class LeafQueue extends AbstractCSQueue { boolean canAssign = true; for (String label : labelCanAccess) { - if (!usedResourcesByNodeLabels.containsKey(label)) { - usedResourcesByNodeLabels.put(label, Resources.createResource(0)); - } - Resource potentialTotalCapacity = - Resources.add(usedResourcesByNodeLabels.get(label), required); + Resources.add(queueUsage.getUsed(label), required); float potentialNewCapacity = Resources.divide(resourceCalculator, clusterResource, @@ -1036,14 +1027,14 @@ public class LeafQueue extends AbstractCSQueue { LOG.debug("try to use reserved: " + getQueueName() + " usedResources: " - + usedResources + + queueUsage.getUsed() + " clusterResources: " + clusterResource + " reservedResources: " + application.getCurrentReservation() + " currentCapacity " + Resources.divide(resourceCalculator, clusterResource, - usedResources, clusterResource) + " required " + required + queueUsage.getUsed(), clusterResource) + " required " + required + " potentialNewWithoutReservedCapacity: " + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: " + absoluteMaxCapacity + ")"); @@ -1063,11 +1054,11 @@ public class LeafQueue extends AbstractCSQueue { if (LOG.isDebugEnabled()) { LOG.debug(getQueueName() + "Check assign to queue, label=" + label - + " usedResources: " + usedResourcesByNodeLabels.get(label) + + " usedResources: " + queueUsage.getUsed(label) + " clusterResources: " + clusterResource + " currentCapacity " + Resources.divide(resourceCalculator, clusterResource, - usedResourcesByNodeLabels.get(label), + queueUsage.getUsed(label), labelManager.getResourceByLabel(label, clusterResource)) + " potentialNewCapacity: " + potentialNewCapacity + " ( " + " max-capacity: " + absoluteMaxCapacity + ")"); @@ -1118,7 +1109,7 @@ public class LeafQueue extends AbstractCSQueue { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + " queueMaxCap=" + queueMaxCap + - " consumed=" + queueUser.getTotalConsumedResources() + + " consumed=" + queueUser.getUsed() + " headroom=" + headroom); } @@ -1173,8 +1164,8 @@ public class LeafQueue extends AbstractCSQueue { Resource currentCapacity = Resources.lessThan(resourceCalculator, clusterResource, - usedResources, queueCapacity) ? - queueCapacity : Resources.add(usedResources, required); + queueUsage.getUsed(), queueCapacity) ? + queueCapacity : Resources.add(queueUsage.getUsed(), required); // Never allow a single user to take more than the // queue's configured capacity * user-limit-factor. @@ -1209,10 +1200,10 @@ public class LeafQueue extends AbstractCSQueue { " userLimit=" + userLimit + " userLimitFactor=" + userLimitFactor + " required: " + required + - " consumed: " + user.getTotalConsumedResources() + + " consumed: " + user.getUsed() + " limit: " + limit + " queueCapacity: " + queueCapacity + - " qconsumed: " + usedResources + + " qconsumed: " + queueUsage.getUsed() + " currentCapacity: " + currentCapacity + " activeUsers: " + activeUsers + " clusterCapacity: " + clusterResource @@ -1237,7 +1228,7 @@ public class LeafQueue extends AbstractCSQueue { // overhead of the AM, but it's a > check, not a >= check, so... if (Resources .greaterThan(resourceCalculator, clusterResource, - user.getConsumedResourceByLabel(label), + user.getUsed(label), limit)) { // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers @@ -1245,13 +1236,13 @@ public class LeafQueue extends AbstractCSQueue { if (Resources.lessThanOrEqual( resourceCalculator, clusterResource, - Resources.subtract(user.getTotalConsumedResources(), + Resources.subtract(user.getUsed(), application.getCurrentReservation()), limit)) { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() + " will exceed limit based on reservations - " + " consumed: " - + user.getTotalConsumedResources() + " reserved: " + + user.getUsed() + " reserved: " + application.getCurrentReservation() + " limit: " + limit); } return true; @@ -1260,7 +1251,7 @@ public class LeafQueue extends AbstractCSQueue { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() + " will exceed limit - " + " consumed: " - + user.getTotalConsumedResources() + " limit: " + limit); + + user.getUsed() + " limit: " + limit); } return false; } @@ -1682,7 +1673,7 @@ public class LeafQueue extends AbstractCSQueue { " queue=" + this.toString() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + + " used=" + queueUsage.getUsed() + " cluster=" + clusterResource); return request.getCapability(); @@ -1783,9 +1774,9 @@ public class LeafQueue extends AbstractCSQueue { if (LOG.isDebugEnabled()) { LOG.info(getQueueName() + " user=" + userName + - " used=" + usedResources + " numContainers=" + numContainers + + " used=" + queueUsage.getUsed() + " numContainers=" + numContainers + " headroom = " + application.getHeadroom() + - " user-resources=" + user.getTotalConsumedResources() + " user-resources=" + user.getUsed() ); } } @@ -1801,8 +1792,8 @@ public class LeafQueue extends AbstractCSQueue { metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); LOG.info(getQueueName() + - " used=" + usedResources + " numContainers=" + numContainers + - " user=" + userName + " user-resources=" + user.getTotalConsumedResources()); + " used=" + queueUsage.getUsed() + " numContainers=" + numContainers + + " user=" + userName + " user-resources=" + user.getUsed()); } private void updateAbsoluteCapacityResource(Resource clusterResource) { @@ -1844,22 +1835,20 @@ public class LeafQueue extends AbstractCSQueue { @VisibleForTesting public static class User { - Resource consumed = Resources.createResource(0, 0); - Resource consumedAMResources = Resources.createResource(0, 0); - Map consumedByLabel = new HashMap(); + ResourceUsage userResourceUsage = new ResourceUsage(); int pendingApplications = 0; int activeApplications = 0; - public Resource getTotalConsumedResources() { - return consumed; + public ResourceUsage getResourceUsage() { + return userResourceUsage; } - public Resource getConsumedResourceByLabel(String label) { - Resource r = consumedByLabel.get(label); - if (null != r) { - return r; - } - return Resources.none(); + public Resource getUsed() { + return userResourceUsage.getUsed(); + } + + public Resource getUsed(String label) { + return userResourceUsage.getUsed(label); } public int getPendingApplications() { @@ -1871,7 +1860,7 @@ public class LeafQueue extends AbstractCSQueue { } public Resource getConsumedAMResources() { - return consumedAMResources; + return userResourceUsage.getAMUsed(); } public int getTotalApplications() { @@ -1896,47 +1885,26 @@ public class LeafQueue extends AbstractCSQueue { } } - public synchronized void assignContainer(Resource resource, + public void assignContainer(Resource resource, Set nodeLabels) { - Resources.addTo(consumed, resource); - if (nodeLabels == null || nodeLabels.isEmpty()) { - if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) { - consumedByLabel.put(RMNodeLabelsManager.NO_LABEL, - Resources.createResource(0)); - } - Resources.addTo(consumedByLabel.get(RMNodeLabelsManager.NO_LABEL), - resource); + userResourceUsage.incUsed(resource); } else { for (String label : nodeLabels) { - if (!consumedByLabel.containsKey(label)) { - consumedByLabel.put(label, Resources.createResource(0)); - } - Resources.addTo(consumedByLabel.get(label), resource); + userResourceUsage.incUsed(label, resource); } } } - public synchronized void releaseContainer(Resource resource, Set nodeLabels) { - Resources.subtractFrom(consumed, resource); - - // Update usedResources by labels + public void releaseContainer(Resource resource, Set nodeLabels) { if (nodeLabels == null || nodeLabels.isEmpty()) { - if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) { - consumedByLabel.put(RMNodeLabelsManager.NO_LABEL, - Resources.createResource(0)); - } - Resources.subtractFrom( - consumedByLabel.get(RMNodeLabelsManager.NO_LABEL), resource); + userResourceUsage.decUsed(resource); } else { for (String label : nodeLabels) { - if (!consumedByLabel.containsKey(label)) { - consumedByLabel.put(label, Resources.createResource(0)); - } - Resources.subtractFrom(consumedByLabel.get(label), resource); + userResourceUsage.decUsed(label, resource); } } - } + } } @Override @@ -1995,7 +1963,7 @@ public class LeafQueue extends AbstractCSQueue { + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" - + usedResources + " cluster=" + clusterResource); + + queueUsage.getUsed() + " cluster=" + clusterResource); // Inform the parent queue getParent().attachContainer(clusterResource, application, rmContainer); } @@ -2013,7 +1981,7 @@ public class LeafQueue extends AbstractCSQueue { + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" - + usedResources + " cluster=" + clusterResource); + + queueUsage.getUsed() + " cluster=" + clusterResource); // Inform the parent queue getParent().detachContainer(clusterResource, application, rmContainer); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index f820ccab929..de92c9c9ab9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -256,7 +256,7 @@ public class ParentQueue extends AbstractCSQueue { "numChildQueue= " + childQueues.size() + ", " + "capacity=" + capacity + ", " + "absoluteCapacity=" + absoluteCapacity + ", " + - "usedResources=" + usedResources + + "usedResources=" + queueUsage.getUsed() + "usedCapacity=" + getUsedCapacity() + ", " + "numApps=" + getNumApplications() + ", " + "numContainers=" + getNumContainers(); @@ -463,7 +463,7 @@ public class ParentQueue extends AbstractCSQueue { " queue=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + + " used=" + queueUsage.getUsed() + " cluster=" + clusterResource); } else { @@ -506,19 +506,16 @@ public class ParentQueue extends AbstractCSQueue { boolean canAssign = true; for (String label : labelCanAccess) { - if (!usedResourcesByNodeLabels.containsKey(label)) { - usedResourcesByNodeLabels.put(label, Resources.createResource(0)); - } float currentAbsoluteLabelUsedCapacity = Resources.divide(resourceCalculator, clusterResource, - usedResourcesByNodeLabels.get(label), + queueUsage.getUsed(label), labelManager.getResourceByLabel(label, clusterResource)); // if any of the label doesn't beyond limit, we can allocate on this node if (currentAbsoluteLabelUsedCapacity >= getAbsoluteMaximumCapacityByNodeLabel(label)) { if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + " used=" + usedResources - + " current-capacity (" + usedResourcesByNodeLabels.get(label) + ") " + LOG.debug(getQueueName() + " used=" + queueUsage.getUsed() + + " current-capacity (" + queueUsage.getUsed(label) + ") " + " >= max-capacity (" + labelManager.getResourceByLabel(label, clusterResource) + ")"); } @@ -540,16 +537,16 @@ public class ParentQueue extends AbstractCSQueue { .getReservedMB(), getMetrics().getReservedVirtualCores()); float capacityWithoutReservedCapacity = Resources.divide( resourceCalculator, clusterResource, - Resources.subtract(usedResources, reservedResources), + Resources.subtract(queueUsage.getUsed(), reservedResources), clusterResource); if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) { if (LOG.isDebugEnabled()) { LOG.debug("parent: try to use reserved: " + getQueueName() - + " usedResources: " + usedResources.getMemory() + + " usedResources: " + queueUsage.getUsed().getMemory() + " clusterResources: " + clusterResource.getMemory() + " reservedResources: " + reservedResources.getMemory() - + " currentCapacity " + ((float) usedResources.getMemory()) + + " currentCapacity " + ((float) queueUsage.getUsed().getMemory()) / clusterResource.getMemory() + " potentialNewWithoutReservedCapacity: " + capacityWithoutReservedCapacity + " ( " + " max-capacity: " @@ -645,7 +642,7 @@ public class ParentQueue extends AbstractCSQueue { " queue=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + usedResources + + " used=" + queueUsage.getUsed() + " cluster=" + clusterResource); } @@ -735,7 +732,7 @@ public class ParentQueue extends AbstractCSQueue { .getResource(), node.getLabels()); LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" + clusterResource); // Inform the parent if (parent != null) { @@ -755,7 +752,7 @@ public class ParentQueue extends AbstractCSQueue { node.getLabels()); LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster=" + + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" + clusterResource); // Inform the parent if (parent != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index 354a5abd28e..5c5c2dfb8ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -298,7 +298,7 @@ public class TestWorkPreservingRMRestart { 1e-8); // assert user consumed resources. assertEquals(usedResource, leafQueue.getUser(app.getUser()) - .getTotalConsumedResources()); + .getUsed()); } private void checkFifoQueue(SchedulerApplication schedulerApp, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java index a62889b3d59..d643c9dc0ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java @@ -202,33 +202,33 @@ public class TestCSQueueUtils { LOG.info("t2 l2q2 " + result); //some usage, but below the base capacity - Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); - Resources.addTo(l1q2.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); + root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); + l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.4f, result, 0.000001f); LOG.info("t2 l2q2 " + result); //usage gt base on parent sibling - Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.3f)); - Resources.addTo(l1q2.getUsedResources(), Resources.multiply(clusterResource, 0.3f)); + root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); + l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.3f, result, 0.000001f); LOG.info("t2 l2q2 " + result); //same as last, but with usage also on direct parent - Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); - Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.1f)); + root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); + l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.3f, result, 0.000001f); LOG.info("t2 l2q2 " + result); //add to direct sibling, below the threshold of effect at present - Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); - Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); - Resources.addTo(l2q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); + root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.3f, result, 0.000001f); @@ -236,9 +236,9 @@ public class TestCSQueueUtils { //add to direct sibling, now above the threshold of effect //(it's cumulative with prior tests) - Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); - Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); - Resources.addTo(l2q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f)); + root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); + l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f)); result = CSQueueUtils.getAbsoluteMaxAvailCapacity( resourceCalculator, clusterResource, l2q2); assertEquals( 0.1f, result, 0.000001f);