YARN-883. Expose Fair Scheduler-specific queue metrics. (sandyr via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1497884 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
27a50a2bef
commit
c221204cca
|
@ -41,6 +41,9 @@ Release 2.2.0 - UNRELEASED
|
|||
|
||||
YARN-736. Add a multi-resource fair sharing metric. (sandyr via tucu)
|
||||
|
||||
YARN-883. Expose Fair Scheduler-specific queue metrics. (sandyr via tucu)
|
||||
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -80,7 +80,7 @@ public class QueueMetrics implements MetricsSource {
|
|||
static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
|
||||
static final MetricsInfo RECORD_INFO = info("QueueMetrics",
|
||||
"Metrics for the resource scheduler");
|
||||
static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue");
|
||||
protected static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue");
|
||||
static final MetricsInfo USER_INFO = info("User", "Metrics by user");
|
||||
static final Splitter Q_SPLITTER =
|
||||
Splitter.on('.').omitEmptyStrings().trimResults();
|
||||
|
@ -92,7 +92,7 @@ public class QueueMetrics implements MetricsSource {
|
|||
private final Map<String, QueueMetrics> users;
|
||||
private final Configuration conf;
|
||||
|
||||
QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
||||
protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
||||
boolean enableUserMetrics, Configuration conf) {
|
||||
registry = new MetricsRegistry(RECORD_INFO);
|
||||
this.queueName = queueName;
|
||||
|
@ -104,12 +104,12 @@ public class QueueMetrics implements MetricsSource {
|
|||
runningTime = buildBuckets(conf);
|
||||
}
|
||||
|
||||
QueueMetrics tag(MetricsInfo info, String value) {
|
||||
protected QueueMetrics tag(MetricsInfo info, String value) {
|
||||
registry.tag(info, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
static StringBuilder sourceName(String queueName) {
|
||||
protected static StringBuilder sourceName(String queueName) {
|
||||
StringBuilder sb = new StringBuilder(RECORD_INFO.name());
|
||||
int i = 0;
|
||||
for (String node : Q_SPLITTER.split(queueName)) {
|
||||
|
@ -128,7 +128,7 @@ public class QueueMetrics implements MetricsSource {
|
|||
|
||||
// this method is here because we want to make sure these metrics show up on
|
||||
// queue registration.
|
||||
private void initMetrics() {
|
||||
public void initMetrics() {
|
||||
appsSubmitted.incr(0);
|
||||
appsRunning.incr(0);
|
||||
appsPending.incr(0);
|
||||
|
@ -149,7 +149,7 @@ public class QueueMetrics implements MetricsSource {
|
|||
/**
|
||||
* Simple metrics cache to help prevent re-registrations.
|
||||
*/
|
||||
private static Map<String, QueueMetrics> queueMetrics =
|
||||
protected final static Map<String, QueueMetrics> queueMetrics =
|
||||
new HashMap<String, QueueMetrics>();
|
||||
|
||||
public synchronized
|
||||
|
|
|
@ -254,13 +254,6 @@ public class AppSchedulable extends Schedulable {
|
|||
}
|
||||
return Resources.none();
|
||||
}
|
||||
else {
|
||||
// TODO this should subtract resource just assigned
|
||||
// TEMPROARY
|
||||
getMetrics().setAvailableResourcesToQueue(
|
||||
scheduler.getClusterCapacity());
|
||||
}
|
||||
|
||||
|
||||
// If we had previously made a reservation, delete it
|
||||
if (reserved) {
|
||||
|
|
|
@ -58,7 +58,7 @@ public class FSParentQueue extends FSQueue {
|
|||
public void recomputeShares() {
|
||||
policy.computeShares(childQueues, getFairShare());
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
|
||||
childQueue.getMetrics().setFairShare(childQueue.getFairShare());
|
||||
childQueue.recomputeShares();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
@Private
|
||||
|
@ -45,7 +44,7 @@ public abstract class FSQueue extends Schedulable implements Queue {
|
|||
private final String name;
|
||||
private final QueueManager queueMgr;
|
||||
private final FairScheduler scheduler;
|
||||
private final QueueMetrics metrics;
|
||||
private final FSQueueMetrics metrics;
|
||||
|
||||
protected final FSParentQueue parent;
|
||||
protected final RecordFactory recordFactory =
|
||||
|
@ -58,7 +57,9 @@ public abstract class FSQueue extends Schedulable implements Queue {
|
|||
this.name = name;
|
||||
this.queueMgr = queueMgr;
|
||||
this.scheduler = scheduler;
|
||||
this.metrics = QueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
|
||||
this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
|
||||
metrics.setMinShare(getMinShare());
|
||||
metrics.setMaxShare(getMaxShare());
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
|
@ -141,10 +142,16 @@ public abstract class FSQueue extends Schedulable implements Queue {
|
|||
}
|
||||
|
||||
@Override
|
||||
public QueueMetrics getMetrics() {
|
||||
public FSQueueMetrics getMetrics() {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFairShare(Resource fairShare) {
|
||||
super.setFairShare(fairShare);
|
||||
metrics.setFairShare(fairShare);
|
||||
}
|
||||
|
||||
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
|
||||
// Check if the leaf-queue allows access
|
||||
if (queueMgr.getQueueAcls(getName()).get(acl).isUserAllowed(user)) {
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
|
||||
public class FSQueueMetrics extends QueueMetrics {
|
||||
|
||||
@Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB;
|
||||
@Metric("Fair share of CPU in vcores") MutableGaugeInt fairShareVCores;
|
||||
@Metric("Minimum share of memory in MB") MutableGaugeInt minShareMB;
|
||||
@Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores;
|
||||
@Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB;
|
||||
@Metric("Maximum share of CPU in vcores") MutableGaugeInt maxShareVCores;
|
||||
|
||||
FSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
||||
boolean enableUserMetrics, Configuration conf) {
|
||||
super(ms, queueName, parent, enableUserMetrics, conf);
|
||||
}
|
||||
|
||||
public void setFairShare(Resource resource) {
|
||||
fairShareMB.set(resource.getMemory());
|
||||
fairShareVCores.set(resource.getVirtualCores());
|
||||
}
|
||||
|
||||
public int getFairShareMB() {
|
||||
return fairShareMB.value();
|
||||
}
|
||||
|
||||
public int getFairShareVirtualCores() {
|
||||
return fairShareVCores.value();
|
||||
}
|
||||
|
||||
public void setMinShare(Resource resource) {
|
||||
minShareMB.set(resource.getMemory());
|
||||
minShareVCores.set(resource.getVirtualCores());
|
||||
}
|
||||
|
||||
public int getMinShareMB() {
|
||||
return minShareMB.value();
|
||||
}
|
||||
|
||||
public int getMinShareVirtualCores() {
|
||||
return minShareVCores.value();
|
||||
}
|
||||
|
||||
public void setMaxShare(Resource resource) {
|
||||
maxShareMB.set(resource.getMemory());
|
||||
maxShareVCores.set(resource.getVirtualCores());
|
||||
}
|
||||
|
||||
public int getMaxShareMB() {
|
||||
return maxShareMB.value();
|
||||
}
|
||||
|
||||
public int getMaxShareVirtualCores() {
|
||||
return maxShareVCores.value();
|
||||
}
|
||||
|
||||
public synchronized
|
||||
static FSQueueMetrics forQueue(String queueName, Queue parent,
|
||||
boolean enableUserMetrics, Configuration conf) {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
QueueMetrics metrics = queueMetrics.get(queueName);
|
||||
if (metrics == null) {
|
||||
metrics = new FSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
|
||||
.tag(QUEUE_INFO, queueName);
|
||||
|
||||
// Register with the MetricsSystems
|
||||
if (ms != null) {
|
||||
metrics = ms.register(
|
||||
sourceName(queueName).toString(),
|
||||
"Metrics for queue: " + queueName, metrics);
|
||||
metrics.initMetrics();
|
||||
}
|
||||
queueMetrics.put(queueName, metrics);
|
||||
}
|
||||
|
||||
return (FSQueueMetrics)metrics;
|
||||
}
|
||||
|
||||
}
|
|
@ -142,7 +142,7 @@ public class FairScheduler implements ResourceScheduler {
|
|||
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
|
||||
|
||||
// Aggregate metrics
|
||||
QueueMetrics rootMetrics;
|
||||
FSQueueMetrics rootMetrics;
|
||||
|
||||
// Time when we last updated preemption vars
|
||||
protected long lastPreemptionUpdateTime;
|
||||
|
@ -1045,7 +1045,7 @@ public class FairScheduler implements ResourceScheduler {
|
|||
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
|
||||
|
||||
if (!initialized) {
|
||||
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
|
||||
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
|
||||
this.rmContext = rmContext;
|
||||
this.eventLog = new FairSchedulerEventLog();
|
||||
eventLog.init(this.conf);
|
||||
|
|
|
@ -390,6 +390,13 @@ public class QueueManager {
|
|||
queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
|
||||
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
|
||||
|
||||
// Update metrics
|
||||
for (FSQueue queue : queues.values()) {
|
||||
FSQueueMetrics queueMetrics = queue.getMetrics();
|
||||
queueMetrics.setMinShare(queue.getMinShare());
|
||||
queueMetrics.setMaxShare(queue.getMaxShare());
|
||||
}
|
||||
|
||||
// Root queue should have empty ACLs. As a queue's ACL is the union of
|
||||
// its ACL and all its parents' ACLs, setting the roots' to empty will
|
||||
// neither allow nor prohibit more access to its children.
|
||||
|
|
|
@ -73,7 +73,7 @@ public class FairSchedulerPage extends RmView {
|
|||
if (maxApps < Integer.MAX_VALUE) {
|
||||
ri._("Max Running Applications:", qinfo.getMaxApplications());
|
||||
}
|
||||
ri._("Fair Share:", qinfo.getFairShare());
|
||||
ri._("Fair Share:", qinfo.getFairShare().toString());
|
||||
|
||||
html._(InfoBlock.class);
|
||||
|
||||
|
@ -95,8 +95,8 @@ public class FairSchedulerPage extends RmView {
|
|||
UL<Hamlet> ul = html.ul("#pq");
|
||||
for (FairSchedulerQueueInfo info : subQueues) {
|
||||
float capacity = info.getMaxResourcesFraction();
|
||||
float fairShare = info.getFairShareFraction();
|
||||
float used = info.getUsedFraction();
|
||||
float fairShare = info.getFairShareMemoryFraction();
|
||||
float used = info.getUsedMemoryFraction();
|
||||
LI<UL<Hamlet>> li = ul.
|
||||
li().
|
||||
a(_Q).$style(width(capacity * Q_MAX_WIDTH)).
|
||||
|
@ -150,7 +150,7 @@ public class FairSchedulerPage extends RmView {
|
|||
} else {
|
||||
FairSchedulerInfo sinfo = new FairSchedulerInfo(fs);
|
||||
fsqinfo.qinfo = sinfo.getRootQueueInfo();
|
||||
float used = fsqinfo.qinfo.getUsedFraction();
|
||||
float used = fsqinfo.qinfo.getUsedMemoryFraction();
|
||||
|
||||
ul.
|
||||
li().$style("margin-bottom: 1em").
|
||||
|
|
|
@ -30,20 +30,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager
|
|||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
public class FairSchedulerQueueInfo {
|
||||
private int fairShare;
|
||||
private int minShare;
|
||||
private int maxShare;
|
||||
private int clusterMaxMem;
|
||||
|
||||
private int maxApps;
|
||||
|
||||
private float fractionUsed;
|
||||
private float fractionFairShare;
|
||||
private float fractionMinShare;
|
||||
private float fractionMemUsed;
|
||||
private float fractionMemFairShare;
|
||||
private float fractionMemMinShare;
|
||||
private float fractionMemMaxShare;
|
||||
|
||||
private Resource minResources;
|
||||
private Resource maxResources;
|
||||
private Resource usedResources;
|
||||
private Resource fairResources;
|
||||
private Resource clusterResources;
|
||||
|
||||
private String queueName;
|
||||
|
||||
|
@ -54,23 +52,20 @@ public class FairSchedulerQueueInfo {
|
|||
|
||||
queueName = queue.getName();
|
||||
|
||||
Resource clusterMax = scheduler.getClusterCapacity();
|
||||
clusterMaxMem = clusterMax.getMemory();
|
||||
clusterResources = scheduler.getClusterCapacity();
|
||||
|
||||
usedResources = queue.getResourceUsage();
|
||||
fractionUsed = (float)usedResources.getMemory() / clusterMaxMem;
|
||||
fractionMemUsed = (float)usedResources.getMemory() /
|
||||
clusterResources.getMemory();
|
||||
|
||||
fairShare = queue.getFairShare().getMemory();
|
||||
fairResources = queue.getFairShare();
|
||||
minResources = queue.getMinShare();
|
||||
minShare = minResources.getMemory();
|
||||
maxResources = scheduler.getQueueManager().getMaxResources(queueName);
|
||||
if (maxResources.getMemory() > clusterMaxMem) {
|
||||
maxResources = Resources.createResource(clusterMaxMem);
|
||||
}
|
||||
maxShare = maxResources.getMemory();
|
||||
maxResources = queue.getMaxShare();
|
||||
maxResources = Resources.componentwiseMin(maxResources, clusterResources);
|
||||
|
||||
fractionFairShare = (float)fairShare / clusterMaxMem;
|
||||
fractionMinShare = (float)minShare / clusterMaxMem;
|
||||
fractionMemFairShare = (float)fairResources.getMemory() / clusterResources.getMemory();
|
||||
fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory();
|
||||
fractionMemMaxShare = (float)maxResources.getMemory() / clusterResources.getMemory();
|
||||
|
||||
maxApps = manager.getQueueMaxApps(queueName);
|
||||
|
||||
|
@ -88,15 +83,15 @@ public class FairSchedulerQueueInfo {
|
|||
/**
|
||||
* Returns the fair share as a fraction of the entire cluster capacity.
|
||||
*/
|
||||
public float getFairShareFraction() {
|
||||
return fractionFairShare;
|
||||
public float getFairShareMemoryFraction() {
|
||||
return fractionMemFairShare;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the fair share of this queue in megabytes.
|
||||
*/
|
||||
public int getFairShare() {
|
||||
return fairShare;
|
||||
public Resource getFairShare() {
|
||||
return fairResources;
|
||||
}
|
||||
|
||||
public Resource getMinResources() {
|
||||
|
@ -123,16 +118,16 @@ public class FairSchedulerQueueInfo {
|
|||
* Returns the queue's min share in as a fraction of the entire
|
||||
* cluster capacity.
|
||||
*/
|
||||
public float getMinShareFraction() {
|
||||
return fractionMinShare;
|
||||
public float getMinShareMemoryFraction() {
|
||||
return fractionMemMinShare;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the memory used by this queue as a fraction of the entire
|
||||
* cluster capacity.
|
||||
*/
|
||||
public float getUsedFraction() {
|
||||
return fractionUsed;
|
||||
public float getUsedMemoryFraction() {
|
||||
return fractionMemUsed;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -140,7 +135,7 @@ public class FairSchedulerQueueInfo {
|
|||
* capacity.
|
||||
*/
|
||||
public float getMaxResourcesFraction() {
|
||||
return (float)maxShare / clusterMaxMem;
|
||||
return fractionMemMaxShare;
|
||||
}
|
||||
|
||||
public Collection<FairSchedulerQueueInfo> getChildQueues() {
|
||||
|
|
|
@ -53,6 +53,7 @@ public class TestFSLeafQueue {
|
|||
String queueName = "root.queue1";
|
||||
QueueManager mockMgr = mock(QueueManager.class);
|
||||
when(mockMgr.getMaxResources(queueName)).thenReturn(maxResource);
|
||||
when(mockMgr.getMinResources(queueName)).thenReturn(Resources.none());
|
||||
|
||||
schedulable = new FSLeafQueue(queueName, mockMgr, scheduler, null);
|
||||
}
|
||||
|
|
|
@ -378,6 +378,7 @@ public class TestFairScheduler {
|
|||
// Divided three ways - betwen the two queues and the default queue
|
||||
for (FSLeafQueue p : queues) {
|
||||
assertEquals(3414, p.getFairShare().getMemory());
|
||||
assertEquals(3414, p.getMetrics().getFairShareMB());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -406,8 +407,11 @@ public class TestFairScheduler {
|
|||
FSLeafQueue queue2 = queueManager.getLeafQueue("parent.queue2");
|
||||
FSLeafQueue queue3 = queueManager.getLeafQueue("parent.queue3");
|
||||
assertEquals(capacity / 2, queue1.getFairShare().getMemory());
|
||||
assertEquals(capacity / 2, queue1.getMetrics().getFairShareMB());
|
||||
assertEquals(capacity / 4, queue2.getFairShare().getMemory());
|
||||
assertEquals(capacity / 4, queue2.getMetrics().getFairShareMB());
|
||||
assertEquals(capacity / 4, queue3.getFairShare().getMemory());
|
||||
assertEquals(capacity / 4, queue3.getMetrics().getFairShareMB());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue