YARN-3494. Expose AM resource limit and usage in CS QueueMetrics. Contributed by Rohith Sharmaks

This commit is contained in:
Jian He 2015-04-21 20:06:20 -07:00
parent e71d0d87d9
commit bdd90110e6
7 changed files with 174 additions and 19 deletions

View File

@ -151,6 +151,9 @@ Release 2.8.0 - UNRELEASED
YARN-3451. Display attempt start time and elapsed time on the web UI. YARN-3451. Display attempt start time and elapsed time on the web UI.
(Rohith Sharmaks via jianhe) (Rohith Sharmaks via jianhe)
YARN-3494. Expose AM resource limit and usage in CS QueueMetrics. (Rohith
Sharmaks via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -83,16 +83,17 @@ public class QueueMetrics implements MetricsSource {
static final MetricsInfo RECORD_INFO = info("QueueMetrics", static final MetricsInfo RECORD_INFO = info("QueueMetrics",
"Metrics for the resource scheduler"); "Metrics for the resource scheduler");
protected static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue"); protected static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue");
static final MetricsInfo USER_INFO = info("User", "Metrics by user"); protected static final MetricsInfo USER_INFO =
info("User", "Metrics by user");
static final Splitter Q_SPLITTER = static final Splitter Q_SPLITTER =
Splitter.on('.').omitEmptyStrings().trimResults(); Splitter.on('.').omitEmptyStrings().trimResults();
final MetricsRegistry registry; protected final MetricsRegistry registry;
final String queueName; protected final String queueName;
final QueueMetrics parent; protected final QueueMetrics parent;
final MetricsSystem metricsSystem; protected final MetricsSystem metricsSystem;
private final Map<String, QueueMetrics> users; protected final Map<String, QueueMetrics> users;
private final Configuration conf; protected final Configuration conf;
protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) { boolean enableUserMetrics, Configuration conf) {

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@ -67,7 +66,7 @@ public abstract class AbstractCSQueue implements CSQueue {
final Resource minimumAllocation; final Resource minimumAllocation;
Resource maximumAllocation; Resource maximumAllocation;
QueueState state; QueueState state;
final QueueMetrics metrics; final CSQueueMetrics metrics;
protected final PrivilegedEntity queueEntity; protected final PrivilegedEntity queueEntity;
final ResourceCalculator resourceCalculator; final ResourceCalculator resourceCalculator;
@ -100,10 +99,10 @@ public abstract class AbstractCSQueue implements CSQueue {
this.resourceCalculator = cs.getResourceCalculator(); this.resourceCalculator = cs.getResourceCalculator();
// must be called after parent and queueName is set // must be called after parent and queueName is set
this.metrics = old != null ? old.getMetrics() : this.metrics =
QueueMetrics.forQueue(getQueuePath(), parent, old != null ? (CSQueueMetrics) old.getMetrics() : CSQueueMetrics
cs.getConfiguration().getEnableUserMetrics(), .forQueue(getQueuePath(), parent, cs.getConfiguration()
cs.getConf()); .getEnableUserMetrics(), cs.getConf());
this.csContext = cs; this.csContext = cs;
this.minimumAllocation = csContext.getMinimumResourceCapability(); this.minimumAllocation = csContext.getMinimumResourceCapability();
@ -171,7 +170,7 @@ public abstract class AbstractCSQueue implements CSQueue {
} }
@Override @Override
public QueueMetrics getMetrics() { public CSQueueMetrics getMetrics() {
return metrics; return metrics;
} }

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@Metrics(context = "yarn")
public class CSQueueMetrics extends QueueMetrics {
@Metric("AM memory limit in MB")
MutableGaugeInt AMResourceLimitMB;
@Metric("AM CPU limit in virtual cores")
MutableGaugeInt AMResourceLimitVCores;
@Metric("Used AM memory limit in MB")
MutableGaugeInt usedAMResourceMB;
@Metric("Used AM CPU limit in virtual cores")
MutableGaugeInt usedAMResourceVCores;
CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
super(ms, queueName, parent, enableUserMetrics, conf);
}
public int getAMResourceLimitMB() {
return AMResourceLimitMB.value();
}
public int getAMResourceLimitVCores() {
return AMResourceLimitVCores.value();
}
public int getUsedAMResourceMB() {
return usedAMResourceMB.value();
}
public int getUsedAMResourceVCores() {
return usedAMResourceVCores.value();
}
public void setAMResouceLimit(Resource res) {
AMResourceLimitMB.set(res.getMemory());
AMResourceLimitVCores.set(res.getVirtualCores());
}
public void setAMResouceLimitForUser(String user, Resource res) {
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {
userMetrics.setAMResouceLimit(res);
}
}
public void incAMUsed(String user, Resource res) {
usedAMResourceMB.incr(res.getMemory());
usedAMResourceVCores.incr(res.getVirtualCores());
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {
userMetrics.incAMUsed(user, res);
}
}
public void decAMUsed(String user, Resource res) {
usedAMResourceMB.decr(res.getMemory());
usedAMResourceVCores.decr(res.getVirtualCores());
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {
userMetrics.decAMUsed(user, res);
}
}
public synchronized static CSQueueMetrics forQueue(String queueName,
Queue parent, boolean enableUserMetrics, Configuration conf) {
MetricsSystem ms = DefaultMetricsSystem.instance();
QueueMetrics metrics = queueMetrics.get(queueName);
if (metrics == null) {
metrics =
new CSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
.tag(QUEUE_INFO, queueName);
// Register with the MetricsSystems
if (ms != null) {
metrics =
ms.register(sourceName(queueName).toString(), "Metrics for queue: "
+ queueName, metrics);
}
queueMetrics.put(queueName, metrics);
}
return (CSQueueMetrics) metrics;
}
@Override
public synchronized QueueMetrics getUserMetrics(String userName) {
if (users == null) {
return null;
}
CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName);
if (metrics == null) {
metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf);
users.put(userName, metrics);
metricsSystem.register(
sourceName(queueName).append(",user=").append(userName).toString(),
"Metrics for user '" + userName + "' in queue '" + queueName + "'",
((CSQueueMetrics) metrics.tag(QUEUE_INFO, queueName)).tag(USER_INFO,
userName));
}
return metrics;
}
}

View File

@ -563,10 +563,12 @@ public class LeafQueue extends AbstractCSQueue {
} }
Resource queueCap = Resources.max(resourceCalculator, lastClusterResource, Resource queueCap = Resources.max(resourceCalculator, lastClusterResource,
absoluteCapacityResource, queueCurrentLimit); absoluteCapacityResource, queueCurrentLimit);
return Resources.multiplyAndNormalizeUp( Resource amResouceLimit =
resourceCalculator, Resources.multiplyAndNormalizeUp(resourceCalculator, queueCap,
queueCap, maxAMResourcePerQueuePercent, minimumAllocation);
maxAMResourcePerQueuePercent, minimumAllocation);
metrics.setAMResouceLimit(amResouceLimit);
return amResouceLimit;
} }
public synchronized Resource getUserAMResourceLimit() { public synchronized Resource getUserAMResourceLimit() {
@ -645,6 +647,8 @@ public class LeafQueue extends AbstractCSQueue {
orderingPolicy.addSchedulableEntity(application); orderingPolicy.addSchedulableEntity(application);
queueUsage.incAMUsed(application.getAMResource()); queueUsage.incAMUsed(application.getAMResource());
user.getResourceUsage().incAMUsed(application.getAMResource()); user.getResourceUsage().incAMUsed(application.getAMResource());
metrics.incAMUsed(application.getUser(), application.getAMResource());
metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit);
i.remove(); i.remove();
LOG.info("Application " + application.getApplicationId() + LOG.info("Application " + application.getApplicationId() +
" from user: " + application.getUser() + " from user: " + application.getUser() +
@ -698,6 +702,7 @@ public class LeafQueue extends AbstractCSQueue {
} else { } else {
queueUsage.decAMUsed(application.getAMResource()); queueUsage.decAMUsed(application.getAMResource());
user.getResourceUsage().decAMUsed(application.getAMResource()); user.getResourceUsage().decAMUsed(application.getAMResource());
metrics.decAMUsed(application.getUser(), application.getAMResource());
} }
applicationAttemptMap.remove(application.getApplicationAttemptId()); applicationAttemptMap.remove(application.getApplicationAttemptId());

View File

@ -278,10 +278,18 @@ public class TestApplicationLimits {
" UserAMResourceLimit=" + " UserAMResourceLimit=" +
queue.getUserAMResourceLimit()); queue.getUserAMResourceLimit());
assertEquals(queue.getAMResourceLimit(), Resource.newInstance(160*GB, 1)); Resource amResourceLimit = Resource.newInstance(160 * GB, 1);
assertEquals(queue.getAMResourceLimit(), amResourceLimit);
assertEquals(queue.getAMResourceLimit(), amResourceLimit);
assertEquals(queue.getUserAMResourceLimit(), assertEquals(queue.getUserAMResourceLimit(),
Resource.newInstance(80*GB, 1)); Resource.newInstance(80*GB, 1));
// Assert in metrics
assertEquals(queue.getMetrics().getAMResourceLimitMB(),
amResourceLimit.getMemory());
assertEquals(queue.getMetrics().getAMResourceLimitVCores(),
amResourceLimit.getVirtualCores());
assertEquals( assertEquals(
(int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()), (int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
queue.getMetrics().getAvailableMB() queue.getMetrics().getAvailableMB()

View File

@ -432,10 +432,16 @@ public class TestLeafQueue {
.getMockApplicationAttemptId(0, 2); .getMockApplicationAttemptId(0, 2);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null, FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null,
spyRMContext); spyRMContext);
app_1.setAMResource(Resource.newInstance(100, 1));
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_0); // same user
assertEquals(1, a.getMetrics().getAppsSubmitted()); assertEquals(1, a.getMetrics().getAppsSubmitted());
assertEquals(1, a.getMetrics().getAppsPending()); assertEquals(1, a.getMetrics().getAppsPending());
assertEquals(1, a.getUser(user_0).getActiveApplications());
assertEquals(app_1.getAMResource().getMemory(), a.getMetrics()
.getUsedAMResourceMB());
assertEquals(app_1.getAMResource().getVirtualCores(), a.getMetrics()
.getUsedAMResourceVCores());
event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0, event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
RMAppAttemptState.FINISHED, false); RMAppAttemptState.FINISHED, false);