YARN-883. Expose Fair Scheduler-specific queue metrics. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1497885 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-06-28 19:01:48 +00:00
parent 5d45387c00
commit 1731028a94
12 changed files with 168 additions and 53 deletions

View File

@ -24,6 +24,9 @@ Release 2.2.0 - UNRELEASED
YARN-736. Add a multi-resource fair sharing metric. (sandyr via tucu)
YARN-883. Expose Fair Scheduler-specific queue metrics. (sandyr via tucu)
OPTIMIZATIONS
BUG FIXES

View File

@ -80,7 +80,7 @@ public class QueueMetrics implements MetricsSource {
static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
static final MetricsInfo RECORD_INFO = info("QueueMetrics",
"Metrics for the resource scheduler");
static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue");
protected static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue");
static final MetricsInfo USER_INFO = info("User", "Metrics by user");
static final Splitter Q_SPLITTER =
Splitter.on('.').omitEmptyStrings().trimResults();
@ -92,7 +92,7 @@ public class QueueMetrics implements MetricsSource {
private final Map<String, QueueMetrics> users;
private final Configuration conf;
QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
registry = new MetricsRegistry(RECORD_INFO);
this.queueName = queueName;
@ -104,12 +104,12 @@ public class QueueMetrics implements MetricsSource {
runningTime = buildBuckets(conf);
}
QueueMetrics tag(MetricsInfo info, String value) {
protected QueueMetrics tag(MetricsInfo info, String value) {
registry.tag(info, value);
return this;
}
static StringBuilder sourceName(String queueName) {
protected static StringBuilder sourceName(String queueName) {
StringBuilder sb = new StringBuilder(RECORD_INFO.name());
int i = 0;
for (String node : Q_SPLITTER.split(queueName)) {
@ -128,7 +128,7 @@ static QueueMetrics forQueue(String queueName, Queue parent,
// this method is here because we want to make sure these metrics show up on
// queue registration.
private void initMetrics() {
public void initMetrics() {
appsSubmitted.incr(0);
appsRunning.incr(0);
appsPending.incr(0);
@ -149,7 +149,7 @@ public synchronized static void clearQueueMetrics() {
/**
* Simple metrics cache to help prevent re-registrations.
*/
private static Map<String, QueueMetrics> queueMetrics =
protected final static Map<String, QueueMetrics> queueMetrics =
new HashMap<String, QueueMetrics>();
public synchronized

View File

@ -254,13 +254,6 @@ private Resource assignContainer(FSSchedulerNode node,
}
return Resources.none();
}
else {
// TODO this should subtract resource just assigned
// TEMPROARY
getMetrics().setAvailableResourcesToQueue(
scheduler.getClusterCapacity());
}
// If we had previously made a reservation, delete it
if (reserved) {

View File

@ -58,7 +58,7 @@ public void addChildQueue(FSQueue child) {
public void recomputeShares() {
policy.computeShares(childQueues, getFairShare());
for (FSQueue childQueue : childQueues) {
childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
childQueue.getMetrics().setFairShare(childQueue.getFairShare());
childQueue.recomputeShares();
}
}

View File

@ -36,7 +36,6 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@ -45,7 +44,7 @@ public abstract class FSQueue extends Schedulable implements Queue {
private final String name;
private final QueueManager queueMgr;
private final FairScheduler scheduler;
private final QueueMetrics metrics;
private final FSQueueMetrics metrics;
protected final FSParentQueue parent;
protected final RecordFactory recordFactory =
@ -58,7 +57,9 @@ public FSQueue(String name, QueueManager queueMgr,
this.name = name;
this.queueMgr = queueMgr;
this.scheduler = scheduler;
this.metrics = QueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
metrics.setMinShare(getMinShare());
metrics.setMaxShare(getMaxShare());
this.parent = parent;
}
@ -141,10 +142,16 @@ public Map<QueueACL, AccessControlList> getQueueAcls() {
}
@Override
public QueueMetrics getMetrics() {
public FSQueueMetrics getMetrics() {
return metrics;
}
@Override
public void setFairShare(Resource fairShare) {
super.setFairShare(fairShare);
metrics.setFairShare(fairShare);
}
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
// Check if the leaf-queue allows access
if (queueMgr.getQueueAcls(getName()).get(acl).isUserAllowed(user)) {

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
public class FSQueueMetrics extends QueueMetrics {
@Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB;
@Metric("Fair share of CPU in vcores") MutableGaugeInt fairShareVCores;
@Metric("Minimum share of memory in MB") MutableGaugeInt minShareMB;
@Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores;
@Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB;
@Metric("Maximum share of CPU in vcores") MutableGaugeInt maxShareVCores;
FSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
super(ms, queueName, parent, enableUserMetrics, conf);
}
public void setFairShare(Resource resource) {
fairShareMB.set(resource.getMemory());
fairShareVCores.set(resource.getVirtualCores());
}
public int getFairShareMB() {
return fairShareMB.value();
}
public int getFairShareVirtualCores() {
return fairShareVCores.value();
}
public void setMinShare(Resource resource) {
minShareMB.set(resource.getMemory());
minShareVCores.set(resource.getVirtualCores());
}
public int getMinShareMB() {
return minShareMB.value();
}
public int getMinShareVirtualCores() {
return minShareVCores.value();
}
public void setMaxShare(Resource resource) {
maxShareMB.set(resource.getMemory());
maxShareVCores.set(resource.getVirtualCores());
}
public int getMaxShareMB() {
return maxShareMB.value();
}
public int getMaxShareVirtualCores() {
return maxShareVCores.value();
}
public synchronized
static FSQueueMetrics forQueue(String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
MetricsSystem ms = DefaultMetricsSystem.instance();
QueueMetrics metrics = queueMetrics.get(queueName);
if (metrics == null) {
metrics = new FSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
.tag(QUEUE_INFO, queueName);
// Register with the MetricsSystems
if (ms != null) {
metrics = ms.register(
sourceName(queueName).toString(),
"Metrics for queue: " + queueName, metrics);
metrics.initMetrics();
}
queueMetrics.put(queueName, metrics);
}
return (FSQueueMetrics)metrics;
}
}

View File

@ -142,7 +142,7 @@ public class FairScheduler implements ResourceScheduler {
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
// Aggregate metrics
QueueMetrics rootMetrics;
FSQueueMetrics rootMetrics;
// Time when we last updated preemption vars
protected long lastPreemptionUpdateTime;
@ -1045,7 +1045,7 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
if (!initialized) {
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);

View File

@ -390,6 +390,13 @@ public void reloadAllocs() throws IOException, ParserConfigurationException,
queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
// Update metrics
for (FSQueue queue : queues.values()) {
FSQueueMetrics queueMetrics = queue.getMetrics();
queueMetrics.setMinShare(queue.getMinShare());
queueMetrics.setMaxShare(queue.getMaxShare());
}
// Root queue should have empty ACLs. As a queue's ACL is the union of
// its ACL and all its parents' ACLs, setting the roots' to empty will
// neither allow nor prohibit more access to its children.

View File

@ -73,7 +73,7 @@ protected void render(Block html) {
if (maxApps < Integer.MAX_VALUE) {
ri._("Max Running Applications:", qinfo.getMaxApplications());
}
ri._("Fair Share:", qinfo.getFairShare());
ri._("Fair Share:", qinfo.getFairShare().toString());
html._(InfoBlock.class);
@ -95,8 +95,8 @@ public void render(Block html) {
UL<Hamlet> ul = html.ul("#pq");
for (FairSchedulerQueueInfo info : subQueues) {
float capacity = info.getMaxResourcesFraction();
float fairShare = info.getFairShareFraction();
float used = info.getUsedFraction();
float fairShare = info.getFairShareMemoryFraction();
float used = info.getUsedMemoryFraction();
LI<UL<Hamlet>> li = ul.
li().
a(_Q).$style(width(capacity * Q_MAX_WIDTH)).
@ -150,7 +150,7 @@ public void render(Block html) {
} else {
FairSchedulerInfo sinfo = new FairSchedulerInfo(fs);
fsqinfo.qinfo = sinfo.getRootQueueInfo();
float used = fsqinfo.qinfo.getUsedFraction();
float used = fsqinfo.qinfo.getUsedMemoryFraction();
ul.
li().$style("margin-bottom: 1em").

View File

@ -30,20 +30,18 @@
import org.apache.hadoop.yarn.util.resource.Resources;
public class FairSchedulerQueueInfo {
private int fairShare;
private int minShare;
private int maxShare;
private int clusterMaxMem;
private int maxApps;
private float fractionUsed;
private float fractionFairShare;
private float fractionMinShare;
private float fractionMemUsed;
private float fractionMemFairShare;
private float fractionMemMinShare;
private float fractionMemMaxShare;
private Resource minResources;
private Resource maxResources;
private Resource usedResources;
private Resource fairResources;
private Resource clusterResources;
private String queueName;
@ -54,23 +52,20 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
queueName = queue.getName();
Resource clusterMax = scheduler.getClusterCapacity();
clusterMaxMem = clusterMax.getMemory();
clusterResources = scheduler.getClusterCapacity();
usedResources = queue.getResourceUsage();
fractionUsed = (float)usedResources.getMemory() / clusterMaxMem;
fractionMemUsed = (float)usedResources.getMemory() /
clusterResources.getMemory();
fairShare = queue.getFairShare().getMemory();
fairResources = queue.getFairShare();
minResources = queue.getMinShare();
minShare = minResources.getMemory();
maxResources = scheduler.getQueueManager().getMaxResources(queueName);
if (maxResources.getMemory() > clusterMaxMem) {
maxResources = Resources.createResource(clusterMaxMem);
}
maxShare = maxResources.getMemory();
maxResources = queue.getMaxShare();
maxResources = Resources.componentwiseMin(maxResources, clusterResources);
fractionFairShare = (float)fairShare / clusterMaxMem;
fractionMinShare = (float)minShare / clusterMaxMem;
fractionMemFairShare = (float)fairResources.getMemory() / clusterResources.getMemory();
fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory();
fractionMemMaxShare = (float)maxResources.getMemory() / clusterResources.getMemory();
maxApps = manager.getQueueMaxApps(queueName);
@ -88,15 +83,15 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
/**
* Returns the fair share as a fraction of the entire cluster capacity.
*/
public float getFairShareFraction() {
return fractionFairShare;
public float getFairShareMemoryFraction() {
return fractionMemFairShare;
}
/**
* Returns the fair share of this queue in megabytes.
*/
public int getFairShare() {
return fairShare;
public Resource getFairShare() {
return fairResources;
}
public Resource getMinResources() {
@ -123,16 +118,16 @@ public Resource getUsedResources() {
* Returns the queue's min share in as a fraction of the entire
* cluster capacity.
*/
public float getMinShareFraction() {
return fractionMinShare;
public float getMinShareMemoryFraction() {
return fractionMemMinShare;
}
/**
* Returns the memory used by this queue as a fraction of the entire
* cluster capacity.
*/
public float getUsedFraction() {
return fractionUsed;
public float getUsedMemoryFraction() {
return fractionMemUsed;
}
/**
@ -140,7 +135,7 @@ public float getUsedFraction() {
* capacity.
*/
public float getMaxResourcesFraction() {
return (float)maxShare / clusterMaxMem;
return fractionMemMaxShare;
}
public Collection<FairSchedulerQueueInfo> getChildQueues() {

View File

@ -53,6 +53,7 @@ public void setup() throws IOException {
String queueName = "root.queue1";
QueueManager mockMgr = mock(QueueManager.class);
when(mockMgr.getMaxResources(queueName)).thenReturn(maxResource);
when(mockMgr.getMinResources(queueName)).thenReturn(Resources.none());
schedulable = new FSLeafQueue(queueName, mockMgr, scheduler, null);
}

View File

@ -378,6 +378,7 @@ public void testSimpleFairShareCalculation() {
// Divided three ways - betwen the two queues and the default queue
for (FSLeafQueue p : queues) {
assertEquals(3414, p.getFairShare().getMemory());
assertEquals(3414, p.getMetrics().getFairShareMB());
}
}
@ -406,8 +407,11 @@ public void testSimpleHierarchicalFairShareCalculation() {
FSLeafQueue queue2 = queueManager.getLeafQueue("parent.queue2");
FSLeafQueue queue3 = queueManager.getLeafQueue("parent.queue3");
assertEquals(capacity / 2, queue1.getFairShare().getMemory());
assertEquals(capacity / 2, queue1.getMetrics().getFairShareMB());
assertEquals(capacity / 4, queue2.getFairShare().getMemory());
assertEquals(capacity / 4, queue2.getMetrics().getFairShareMB());
assertEquals(capacity / 4, queue3.getFairShare().getMemory());
assertEquals(capacity / 4, queue3.getMetrics().getFairShareMB());
}
@Test