MAPREDUCE-3773. Add queue metrics with buckets for job run times. Contributed by Owen O'Malley.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1299100 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
558cc8c0bf
commit
ed4c222d5c
|
@ -113,7 +113,11 @@ Release 0.23.3 - UNRELEASED
|
|||
MAPREDUCE-3885. Avoid an unnecessary copy for all requests/responses in
|
||||
MRs ProtoOverHadoopRpcEngine. (Devaraj Das via sseth)
|
||||
|
||||
MAPREDUCE-3991. Streaming FAQ has some wrong instructions about input files splitting. (harsh)
|
||||
MAPREDUCE-3991. Streaming FAQ has some wrong instructions about input files
|
||||
splitting. (harsh)
|
||||
|
||||
MAPREDUCE-3773. Add queue metrics with buckets for job run times. (omalley
|
||||
via acmurthy)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
|
|
|
@ -233,7 +233,19 @@ public class YarnConfiguration extends Configuration {
|
|||
|
||||
/** Default queue name */
|
||||
public static final String DEFAULT_QUEUE_NAME = "default";
|
||||
|
||||
|
||||
/**
|
||||
* Buckets (in minutes) for the number of apps running in each queue.
|
||||
*/
|
||||
public static final String RM_METRICS_RUNTIME_BUCKETS =
|
||||
RM_PREFIX + "metrics.runtime.buckets";
|
||||
|
||||
/**
|
||||
* Default sizes of the runtime metric buckets in minutes.
|
||||
*/
|
||||
public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS =
|
||||
"60,300,1440";
|
||||
|
||||
////////////////////////////////
|
||||
// Node Manager Configs
|
||||
////////////////////////////////
|
||||
|
|
|
@ -231,7 +231,7 @@ public class AppSchedulingInfo {
|
|||
// once an allocation is done we assume the application is
|
||||
// running from scheduler's POV.
|
||||
pending = false;
|
||||
metrics.incrAppsRunning(user);
|
||||
metrics.incrAppsRunning(this, user);
|
||||
}
|
||||
LOG.debug("allocate: user: " + user + ", memory: "
|
||||
+ request.getCapability());
|
||||
|
|
|
@ -21,11 +21,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|||
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.multiply;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
|
@ -34,7 +38,9 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
|||
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -43,7 +49,7 @@ import com.google.common.base.Splitter;
|
|||
|
||||
@InterfaceAudience.Private
|
||||
@Metrics(context="yarn")
|
||||
public class QueueMetrics {
|
||||
public class QueueMetrics implements MetricsSource {
|
||||
@Metric("# of apps submitted") MutableCounterInt appsSubmitted;
|
||||
@Metric("# of running apps") MutableGaugeInt appsRunning;
|
||||
@Metric("# of pending apps") MutableGaugeInt appsPending;
|
||||
|
@ -62,6 +68,8 @@ public class QueueMetrics {
|
|||
@Metric("# of reserved containers") MutableGaugeInt reservedContainers;
|
||||
@Metric("# of active users") MutableGaugeInt activeUsers;
|
||||
@Metric("# of active users") MutableGaugeInt activeApplications;
|
||||
private final MutableGaugeInt[] runningTime;
|
||||
private TimeBucketMetrics<ApplicationId> runBuckets;
|
||||
|
||||
static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
|
||||
static final MetricsInfo RECORD_INFO = info("QueueMetrics",
|
||||
|
@ -76,14 +84,18 @@ public class QueueMetrics {
|
|||
final QueueMetrics parent;
|
||||
final MetricsSystem metricsSystem;
|
||||
private final Map<String, QueueMetrics> users;
|
||||
private final Configuration conf;
|
||||
|
||||
QueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics) {
|
||||
QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
||||
boolean enableUserMetrics, Configuration conf) {
|
||||
registry = new MetricsRegistry(RECORD_INFO);
|
||||
this.queueName = queueName;
|
||||
this.parent = parent != null ? parent.getMetrics() : null;
|
||||
this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>()
|
||||
: null;
|
||||
metricsSystem = ms;
|
||||
this.conf = conf;
|
||||
runningTime = buildBuckets(conf);
|
||||
}
|
||||
|
||||
QueueMetrics tag(MetricsInfo info, String value) {
|
||||
|
@ -102,15 +114,18 @@ public class QueueMetrics {
|
|||
|
||||
public synchronized
|
||||
static QueueMetrics forQueue(String queueName, Queue parent,
|
||||
boolean enableUserMetrics) {
|
||||
boolean enableUserMetrics,
|
||||
Configuration conf) {
|
||||
return forQueue(DefaultMetricsSystem.instance(), queueName, parent,
|
||||
enableUserMetrics);
|
||||
enableUserMetrics, conf);
|
||||
}
|
||||
|
||||
public static QueueMetrics forQueue(MetricsSystem ms, String queueName,
|
||||
Queue parent, boolean enableUserMetrics) {
|
||||
QueueMetrics metrics = new QueueMetrics(ms, queueName, parent,
|
||||
enableUserMetrics).tag(QUEUE_INFO, queueName);
|
||||
Queue parent, boolean enableUserMetrics,
|
||||
Configuration conf) {
|
||||
QueueMetrics metrics =
|
||||
new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf
|
||||
).tag(QUEUE_INFO, queueName);
|
||||
return ms == null ? metrics : ms.register(sourceName(queueName).toString(),
|
||||
"Metrics for queue: " + queueName, metrics);
|
||||
}
|
||||
|
@ -121,7 +136,7 @@ public class QueueMetrics {
|
|||
}
|
||||
QueueMetrics metrics = users.get(userName);
|
||||
if (metrics == null) {
|
||||
metrics = new QueueMetrics(metricsSystem, queueName, null, false);
|
||||
metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf);
|
||||
users.put(userName, metrics);
|
||||
metricsSystem.register(
|
||||
sourceName(queueName).append(",user=").append(userName).toString(),
|
||||
|
@ -131,6 +146,41 @@ public class QueueMetrics {
|
|||
return metrics;
|
||||
}
|
||||
|
||||
private ArrayList<Integer> parseInts(String value) {
|
||||
ArrayList<Integer> result = new ArrayList<Integer>();
|
||||
for(String s: value.split(",")) {
|
||||
result.add(Integer.parseInt(s.trim()));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private MutableGaugeInt[] buildBuckets(Configuration conf) {
|
||||
ArrayList<Integer> buckets =
|
||||
parseInts(conf.get(YarnConfiguration.RM_METRICS_RUNTIME_BUCKETS,
|
||||
YarnConfiguration.DEFAULT_RM_METRICS_RUNTIME_BUCKETS));
|
||||
MutableGaugeInt[] result = new MutableGaugeInt[buckets.size() + 1];
|
||||
result[0] = registry.newGauge("running_0", "", 0);
|
||||
long[] cuts = new long[buckets.size()];
|
||||
for(int i=0; i < buckets.size(); ++i) {
|
||||
result[i+1] = registry.newGauge("running_" + buckets.get(i), "", 0);
|
||||
cuts[i] = buckets.get(i) * 1000L * 60; // covert from min to ms
|
||||
}
|
||||
this.runBuckets = new TimeBucketMetrics<ApplicationId>(cuts);
|
||||
return result;
|
||||
}
|
||||
|
||||
private void updateRunningTime() {
|
||||
int[] counts = runBuckets.getBucketCounts(System.currentTimeMillis());
|
||||
for(int i=0; i < counts.length; ++i) {
|
||||
runningTime[i].set(counts[i]);
|
||||
}
|
||||
}
|
||||
|
||||
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||
updateRunningTime();
|
||||
registry.snapshot(collector.addRecord(registry.info()), all);
|
||||
}
|
||||
|
||||
public void submitApp(String user) {
|
||||
appsSubmitted.incr();
|
||||
appsPending.incr();
|
||||
|
@ -143,20 +193,22 @@ public class QueueMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
public void incrAppsRunning(String user) {
|
||||
public void incrAppsRunning(AppSchedulingInfo app, String user) {
|
||||
runBuckets.add(app.getApplicationId(), System.currentTimeMillis());
|
||||
appsRunning.incr();
|
||||
appsPending.decr();
|
||||
QueueMetrics userMetrics = getUserMetrics(user);
|
||||
if (userMetrics != null) {
|
||||
userMetrics.incrAppsRunning(user);
|
||||
userMetrics.incrAppsRunning(app, user);
|
||||
}
|
||||
if (parent != null) {
|
||||
parent.incrAppsRunning(user);
|
||||
parent.incrAppsRunning(app, user);
|
||||
}
|
||||
}
|
||||
|
||||
public void finishApp(AppSchedulingInfo app,
|
||||
RMAppAttemptState rmAppAttemptFinalState) {
|
||||
runBuckets.remove(app.getApplicationId());
|
||||
switch (rmAppAttemptFinalState) {
|
||||
case KILLED: appsKilled.incr(); break;
|
||||
case FAILED: appsFailed.incr(); break;
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
/**
|
||||
* Create a set of buckets that hold key-time pairs. When the values of the
|
||||
* buckets is queried, the number of objects with time differences in the
|
||||
* different buckets is returned.
|
||||
*/
|
||||
class TimeBucketMetrics<OBJ> {
|
||||
|
||||
private final HashMap<OBJ, Long> map = new HashMap<OBJ, Long>();
|
||||
private final int[] counts;
|
||||
private final long[] cuts;
|
||||
|
||||
/**
|
||||
* Create a set of buckets based on a set of time points. The number of
|
||||
* buckets is one more than the number of points.
|
||||
*/
|
||||
TimeBucketMetrics(long[] cuts) {
|
||||
this.cuts = cuts;
|
||||
counts = new int[cuts.length + 1];
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an object to be counted
|
||||
*/
|
||||
synchronized void add(OBJ key, long time) {
|
||||
map.put(key, time);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove an object to be counted
|
||||
*/
|
||||
synchronized void remove(OBJ key) {
|
||||
map.remove(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the bucket based on the cut points.
|
||||
*/
|
||||
private int findBucket(long val) {
|
||||
for(int i=0; i < cuts.length; ++i) {
|
||||
if (val < cuts[i]) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return cuts.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the counts of how many keys are in each bucket. The same array is
|
||||
* returned by each call to this method.
|
||||
*/
|
||||
synchronized int[] getBucketCounts(long now) {
|
||||
for(int i=0; i < counts.length; ++i) {
|
||||
counts[i] = 0;
|
||||
}
|
||||
for(Long time: map.values()) {
|
||||
counts[findBucket(now - time)] += 1;
|
||||
}
|
||||
return counts;
|
||||
}
|
||||
}
|
|
@ -31,9 +31,11 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.Lock;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
|
@ -79,7 +81,7 @@ import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
|||
@Evolving
|
||||
@SuppressWarnings("unchecked")
|
||||
public class CapacityScheduler
|
||||
implements ResourceScheduler, CapacitySchedulerContext {
|
||||
implements ResourceScheduler, CapacitySchedulerContext, Configurable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
|
||||
|
||||
|
@ -109,7 +111,21 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|||
}
|
||||
};
|
||||
|
||||
public void setConf(Configuration conf) {
|
||||
if (conf instanceof YarnConfiguration) {
|
||||
yarnConf = (YarnConfiguration) conf;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Can only configure with " +
|
||||
"YarnConfiguration");
|
||||
}
|
||||
}
|
||||
|
||||
public Configuration getConf() {
|
||||
return yarnConf;
|
||||
}
|
||||
|
||||
private CapacitySchedulerConfiguration conf;
|
||||
private YarnConfiguration yarnConf;
|
||||
private ContainerTokenSecretManager containerTokenSecretManager;
|
||||
private RMContext rmContext;
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||
|
@ -39,4 +40,9 @@ public interface CapacitySchedulerContext {
|
|||
RMContext getRMContext();
|
||||
|
||||
Resource getClusterResources();
|
||||
|
||||
/**
|
||||
* Get the yarn configuration.
|
||||
*/
|
||||
Configuration getConf();
|
||||
}
|
||||
|
|
|
@ -135,7 +135,8 @@ public class LeafQueue implements CSQueue {
|
|||
// must be after parent and queueName are initialized
|
||||
this.metrics = old != null ? old.getMetrics() :
|
||||
QueueMetrics.forQueue(getQueuePath(), parent,
|
||||
cs.getConfiguration().getEnableUserMetrics());
|
||||
cs.getConfiguration().getEnableUserMetrics(),
|
||||
cs.getConf());
|
||||
this.activeUsersManager = new ActiveUsersManager(metrics);
|
||||
this.minimumAllocation = cs.getMinimumResourceCapability();
|
||||
this.maximumAllocation = cs.getMaximumResourceCapability();
|
||||
|
|
|
@ -108,7 +108,8 @@ public class ParentQueue implements CSQueue {
|
|||
// must be called after parent and queueName is set
|
||||
this.metrics = old != null ? old.getMetrics() :
|
||||
QueueMetrics.forQueue(getQueuePath(), parent,
|
||||
cs.getConfiguration().getEnableUserMetrics());
|
||||
cs.getConfiguration().getEnableUserMetrics(),
|
||||
cs.getConf());
|
||||
|
||||
int rawCapacity = cs.getConfiguration().getCapacity(getQueuePath());
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
|
@ -90,7 +91,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
|
|||
@LimitedPrivate("yarn")
|
||||
@Evolving
|
||||
@SuppressWarnings("unchecked")
|
||||
public class FifoScheduler implements ResourceScheduler {
|
||||
public class FifoScheduler implements ResourceScheduler, Configurable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
|
||||
|
||||
|
@ -126,10 +127,10 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
private Map<ApplicationAttemptId, SchedulerApp> applications
|
||||
= new TreeMap<ApplicationAttemptId, SchedulerApp>();
|
||||
|
||||
private final ActiveUsersManager activeUsersManager;
|
||||
private ActiveUsersManager activeUsersManager;
|
||||
|
||||
private static final String DEFAULT_QUEUE_NAME = "default";
|
||||
private final QueueMetrics metrics;
|
||||
private QueueMetrics metrics;
|
||||
|
||||
private final Queue DEFAULT_QUEUE = new Queue() {
|
||||
@Override
|
||||
|
@ -181,11 +182,18 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
}
|
||||
};
|
||||
|
||||
public FifoScheduler() {
|
||||
metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false);
|
||||
@Override
|
||||
public synchronized void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, conf);
|
||||
activeUsersManager = new ActiveUsersManager(metrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getMinimumResourceCapability() {
|
||||
return minimumAllocation;
|
||||
|
@ -207,8 +215,8 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
RMContext rmContext)
|
||||
throws IOException
|
||||
{
|
||||
setConf(conf);
|
||||
if (!this.initialized) {
|
||||
this.conf = conf;
|
||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||
this.rmContext = rmContext;
|
||||
this.minimumAllocation =
|
||||
|
@ -216,8 +224,6 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
this.maximumAllocation =
|
||||
Resources.createResource(conf.getInt(MAXIMUM_ALLOCATION, MAXIMUM_MEMORY));
|
||||
this.initialized = true;
|
||||
} else {
|
||||
this.conf = conf;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
|
@ -38,6 +39,7 @@ import org.junit.Test;
|
|||
|
||||
public class TestQueueMetrics {
|
||||
static final int GB = 1024; // MB
|
||||
private static final Configuration conf = new Configuration();
|
||||
|
||||
final MetricsSystem ms = new MetricsSystemImpl();
|
||||
|
||||
|
@ -45,7 +47,8 @@ public class TestQueueMetrics {
|
|||
String queueName = "single";
|
||||
String user = "alice";
|
||||
|
||||
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false);
|
||||
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
|
||||
conf);
|
||||
MetricsSource queueSource= queueSource(ms, queueName);
|
||||
AppSchedulingInfo app = mockApp(user);
|
||||
|
||||
|
@ -59,7 +62,7 @@ public class TestQueueMetrics {
|
|||
// configurable cluster/queue resources
|
||||
checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
|
||||
|
||||
metrics.incrAppsRunning(user);
|
||||
metrics.incrAppsRunning(app, user);
|
||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
|
||||
|
||||
metrics.allocateResources(user, 3, Resources.createResource(2*GB));
|
||||
|
@ -77,7 +80,8 @@ public class TestQueueMetrics {
|
|||
String queueName = "single2";
|
||||
String user = "dodo";
|
||||
|
||||
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true);
|
||||
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true,
|
||||
conf);
|
||||
MetricsSource queueSource = queueSource(ms, queueName);
|
||||
AppSchedulingInfo app = mockApp(user);
|
||||
|
||||
|
@ -95,7 +99,7 @@ public class TestQueueMetrics {
|
|||
checkResources(queueSource, 0, 0, 0, 0, 100*GB, 15*GB, 5, 0, 0);
|
||||
checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
|
||||
|
||||
metrics.incrAppsRunning(user);
|
||||
metrics.incrAppsRunning(app, user);
|
||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
|
||||
checkApps(userSource, 1, 0, 1, 0, 0, 0);
|
||||
|
||||
|
@ -118,11 +122,11 @@ public class TestQueueMetrics {
|
|||
String user = "alice";
|
||||
|
||||
QueueMetrics parentMetrics =
|
||||
QueueMetrics.forQueue(ms, parentQueueName, null, true);
|
||||
QueueMetrics.forQueue(ms, parentQueueName, null, true, conf);
|
||||
Queue parentQueue = make(stub(Queue.class).returning(parentMetrics).
|
||||
from.getMetrics());
|
||||
QueueMetrics metrics =
|
||||
QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true);
|
||||
QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf);
|
||||
MetricsSource parentQueueSource = queueSource(ms, parentQueueName);
|
||||
MetricsSource queueSource = queueSource(ms, leafQueueName);
|
||||
AppSchedulingInfo app = mockApp(user);
|
||||
|
@ -146,7 +150,7 @@ public class TestQueueMetrics {
|
|||
checkResources(userSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
|
||||
checkResources(parentUserSource, 0, 0, 0, 0, 10*GB, 15*GB, 5, 0, 0);
|
||||
|
||||
metrics.incrAppsRunning(user);
|
||||
metrics.incrAppsRunning(app, user);
|
||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0);
|
||||
checkApps(userSource, 1, 0, 1, 0, 0, 0);
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
|
@ -56,11 +57,13 @@ public class TestApplicationLimits {
|
|||
public void setUp() throws IOException {
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
setupQueueConfiguration(csConf);
|
||||
|
||||
|
||||
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
|
||||
when(csContext.getConfiguration()).thenReturn(csConf);
|
||||
when(csContext.getConf()).thenReturn(conf);
|
||||
when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
|
||||
when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
|
||||
when(csContext.getClusterResources()).thenReturn(Resources.createResource(10 * 16 * GB));
|
||||
|
@ -124,10 +127,11 @@ public class TestApplicationLimits {
|
|||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(csConf);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
||||
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
|
||||
when(csContext.getConfiguration()).thenReturn(csConf);
|
||||
when(csContext.getConf()).thenReturn(conf);
|
||||
when(csContext.getMinimumResourceCapability()).
|
||||
thenReturn(Resources.createResource(GB));
|
||||
when(csContext.getMaximumResourceCapability()).
|
||||
|
@ -363,9 +367,11 @@ public class TestApplicationLimits {
|
|||
new CapacitySchedulerConfiguration();
|
||||
csConf.setUserLimit(CapacitySchedulerConfiguration.ROOT + "." + A, 25);
|
||||
setupQueueConfiguration(csConf);
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
|
||||
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
|
||||
when(csContext.getConfiguration()).thenReturn(csConf);
|
||||
when(csContext.getConf()).thenReturn(conf);
|
||||
when(csContext.getMinimumResourceCapability()).
|
||||
thenReturn(Resources.createResource(GB));
|
||||
when(csContext.getMaximumResourceCapability()).
|
||||
|
|
|
@ -73,12 +73,13 @@ public class TestCapacityScheduler {
|
|||
public void setUp() throws Exception {
|
||||
Store store = StoreFactory.getStore(new Configuration());
|
||||
resourceManager = new ResourceManager(store);
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
csConf.setClass(YarnConfiguration.RM_SCHEDULER,
|
||||
CapacityScheduler.class, ResourceScheduler.class);
|
||||
CapacitySchedulerConfiguration csConf
|
||||
= new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(csConf);
|
||||
resourceManager.init(csConf);
|
||||
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER,
|
||||
CapacityScheduler.class, ResourceScheduler.class);
|
||||
resourceManager.init(conf);
|
||||
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
|
||||
}
|
||||
|
||||
|
@ -244,6 +245,7 @@ public class TestCapacityScheduler {
|
|||
CapacityScheduler cs = new CapacityScheduler();
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.reinitialize(conf, null, null);
|
||||
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||
|
||||
|
@ -332,6 +334,7 @@ public class TestCapacityScheduler {
|
|||
@Test(expected=IOException.class)
|
||||
public void testParseQueue() throws IOException {
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
cs.setConf(new YarnConfiguration());
|
||||
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
|
@ -348,6 +351,7 @@ public class TestCapacityScheduler {
|
|||
new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(csConf);
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.reinitialize(csConf, null, null);
|
||||
|
||||
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
|
|||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
|
@ -95,10 +96,12 @@ public class TestLeafQueue {
|
|||
new CapacitySchedulerConfiguration();
|
||||
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
|
||||
setupQueueConfiguration(csConf);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
cs.setConf(conf);
|
||||
|
||||
csContext = mock(CapacitySchedulerContext.class);
|
||||
when(csContext.getConfiguration()).thenReturn(csConf);
|
||||
when(csContext.getConf()).thenReturn(conf);
|
||||
when(csContext.getMinimumResourceCapability()).
|
||||
thenReturn(Resources.createResource(GB));
|
||||
when(csContext.getMaximumResourceCapability()).
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.List;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
|
@ -50,6 +51,7 @@ public class TestParentQueue {
|
|||
private static final Log LOG = LogFactory.getLog(TestParentQueue.class);
|
||||
|
||||
RMContext rmContext;
|
||||
YarnConfiguration conf;
|
||||
CapacitySchedulerConfiguration csConf;
|
||||
CapacitySchedulerContext csContext;
|
||||
|
||||
|
@ -59,9 +61,11 @@ public class TestParentQueue {
|
|||
@Before
|
||||
public void setUp() throws Exception {
|
||||
rmContext = TestUtils.getMockRMContext();
|
||||
conf = new YarnConfiguration();
|
||||
csConf = new CapacitySchedulerConfiguration();
|
||||
|
||||
csContext = mock(CapacitySchedulerContext.class);
|
||||
when(csContext.getConf()).thenReturn(conf);
|
||||
when(csContext.getConfiguration()).thenReturn(csConf);
|
||||
when(csContext.getMinimumResourceCapability()).thenReturn(
|
||||
Resources.createResource(GB));
|
||||
|
|
|
@ -22,6 +22,7 @@ import junit.framework.Assert;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.junit.Test;
|
||||
|
@ -34,10 +35,13 @@ public class TestQueueParsing {
|
|||
|
||||
@Test
|
||||
public void testQueueParsing() throws Exception {
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(csConf);
|
||||
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||
|
||||
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||
capacityScheduler.setConf(conf);
|
||||
capacityScheduler.reinitialize(conf, null, null);
|
||||
|
||||
CSQueue a = capacityScheduler.getQueue("a");
|
||||
|
@ -133,6 +137,7 @@ public class TestQueueParsing {
|
|||
conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 90);
|
||||
|
||||
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||
capacityScheduler.setConf(new YarnConfiguration());
|
||||
capacityScheduler.reinitialize(conf, null, null);
|
||||
}
|
||||
|
||||
|
@ -155,6 +160,7 @@ public class TestQueueParsing {
|
|||
CapacityScheduler capacityScheduler;
|
||||
try {
|
||||
capacityScheduler = new CapacityScheduler();
|
||||
capacityScheduler.setConf(new YarnConfiguration());
|
||||
capacityScheduler.reinitialize(conf, null, null);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
fail = true;
|
||||
|
@ -166,6 +172,7 @@ public class TestQueueParsing {
|
|||
|
||||
// Now this should work
|
||||
capacityScheduler = new CapacityScheduler();
|
||||
capacityScheduler.setConf(new YarnConfiguration());
|
||||
capacityScheduler.reinitialize(conf, null, null);
|
||||
|
||||
fail = false;
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
|
@ -190,6 +191,7 @@ public class TestRMWebApp {
|
|||
setupQueueConfiguration(conf);
|
||||
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.reinitialize(conf, null, null);
|
||||
return cs;
|
||||
}
|
||||
|
@ -265,6 +267,7 @@ public class TestRMWebApp {
|
|||
setupFifoQueueConfiguration(conf);
|
||||
|
||||
FifoScheduler fs = new FifoScheduler();
|
||||
fs.setConf(new YarnConfiguration());
|
||||
fs.reinitialize(conf, null, null);
|
||||
return fs;
|
||||
}
|
||||
|
|
|
@ -61,6 +61,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
|
|||
|
||||
private static MockRM rm;
|
||||
private CapacitySchedulerConfiguration csConf;
|
||||
private YarnConfiguration conf;
|
||||
|
||||
private class QueueInfo {
|
||||
float capacity;
|
||||
|
@ -94,10 +95,11 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
|
|||
bind(RMWebServices.class);
|
||||
bind(GenericExceptionHandler.class);
|
||||
csConf = new CapacitySchedulerConfiguration();
|
||||
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
setupQueueConfiguration(csConf);
|
||||
rm = new MockRM(csConf);
|
||||
conf = new YarnConfiguration(csConf);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
rm = new MockRM(conf);
|
||||
bind(ResourceManager.class).toInstance(rm);
|
||||
bind(RMContext.class).toInstance(rm.getRMContext());
|
||||
bind(ApplicationACLsManager.class).toInstance(
|
||||
|
|
Loading…
Reference in New Issue