YARN-4878. Expose scheduling policy and max running apps over JMX for Yarn queues. (Yufei Gu via kasha)
This commit is contained in:
parent
57c31a3fef
commit
f979d779e1
@ -70,6 +70,10 @@ public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
|
|||||||
this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
|
this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
|
||||||
metrics.setMinShare(getMinShare());
|
metrics.setMinShare(getMinShare());
|
||||||
metrics.setMaxShare(getMaxShare());
|
metrics.setMaxShare(getMaxShare());
|
||||||
|
|
||||||
|
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
|
||||||
|
metrics.setMaxApps(allocConf.getQueueMaxApps(name));
|
||||||
|
metrics.setSchedulingPolicy(allocConf.getSchedulingPolicy(name).getName());
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,7 +39,10 @@ public class FSQueueMetrics extends QueueMetrics {
|
|||||||
@Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores;
|
@Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores;
|
||||||
@Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB;
|
@Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB;
|
||||||
@Metric("Maximum share of CPU in vcores") MutableGaugeInt maxShareVCores;
|
@Metric("Maximum share of CPU in vcores") MutableGaugeInt maxShareVCores;
|
||||||
|
@Metric("Maximum number of applications") MutableGaugeInt maxApps;
|
||||||
|
|
||||||
|
private String schedulingPolicy;
|
||||||
|
|
||||||
FSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
FSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
||||||
boolean enableUserMetrics, Configuration conf) {
|
boolean enableUserMetrics, Configuration conf) {
|
||||||
super(ms, queueName, parent, enableUserMetrics, conf);
|
super(ms, queueName, parent, enableUserMetrics, conf);
|
||||||
@ -96,8 +99,24 @@ public int getMaxShareMB() {
|
|||||||
public int getMaxShareVirtualCores() {
|
public int getMaxShareVirtualCores() {
|
||||||
return maxShareVCores.value();
|
return maxShareVCores.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized
|
public int getMaxApps() {
|
||||||
|
return maxApps.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxApps(int max) {
|
||||||
|
maxApps.set(max);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSchedulingPolicy() {
|
||||||
|
return schedulingPolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSchedulingPolicy(String policy) {
|
||||||
|
schedulingPolicy = policy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized
|
||||||
static FSQueueMetrics forQueue(String queueName, Queue parent,
|
static FSQueueMetrics forQueue(String queueName, Queue parent,
|
||||||
boolean enableUserMetrics, Configuration conf) {
|
boolean enableUserMetrics, Configuration conf) {
|
||||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
|
@ -430,11 +430,14 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
|
|||||||
FSQueueMetrics queueMetrics = queue.getMetrics();
|
FSQueueMetrics queueMetrics = queue.getMetrics();
|
||||||
queueMetrics.setMinShare(queue.getMinShare());
|
queueMetrics.setMinShare(queue.getMinShare());
|
||||||
queueMetrics.setMaxShare(queue.getMaxShare());
|
queueMetrics.setMaxShare(queue.getMaxShare());
|
||||||
// Set scheduling policies
|
// Set scheduling policies and update queue metrics
|
||||||
try {
|
try {
|
||||||
SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName());
|
SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName());
|
||||||
policy.initialize(scheduler.getClusterResource());
|
policy.initialize(scheduler.getClusterResource());
|
||||||
queue.setPolicy(policy);
|
queue.setPolicy(policy);
|
||||||
|
|
||||||
|
queueMetrics.setMaxApps(queueConf.getQueueMaxApps(queue.getName()));
|
||||||
|
queueMetrics.setSchedulingPolicy(policy.getName());
|
||||||
} catch (AllocationConfigurationException ex) {
|
} catch (AllocationConfigurationException ex) {
|
||||||
LOG.warn("Cannot apply configured scheduling policy to queue "
|
LOG.warn("Cannot apply configured scheduling policy to queue "
|
||||||
+ queue.getName(), ex);
|
+ queue.getName(), ex);
|
||||||
|
@ -86,7 +86,14 @@ public void testUpdateDemand() {
|
|||||||
String queueName = "root.queue1";
|
String queueName = "root.queue1";
|
||||||
when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource);
|
when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource);
|
||||||
when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none());
|
when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none());
|
||||||
|
when(scheduler.allocConf.getQueueMaxApps(queueName)).
|
||||||
|
thenReturn(Integer.MAX_VALUE);
|
||||||
|
when(scheduler.allocConf.getSchedulingPolicy(queueName))
|
||||||
|
.thenReturn(SchedulingPolicy.DEFAULT_POLICY);
|
||||||
FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
|
FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
|
||||||
|
assertEquals(schedulable.getMetrics().getMaxApps(), Integer.MAX_VALUE);
|
||||||
|
assertEquals(schedulable.getMetrics().getSchedulingPolicy(),
|
||||||
|
SchedulingPolicy.DEFAULT_POLICY.getName());
|
||||||
|
|
||||||
FSAppAttempt app = mock(FSAppAttempt.class);
|
FSAppAttempt app = mock(FSAppAttempt.class);
|
||||||
Mockito.when(app.getDemand()).thenReturn(maxResource);
|
Mockito.when(app.getDemand()).thenReturn(maxResource);
|
||||||
@ -118,6 +125,11 @@ public void test() throws Exception {
|
|||||||
resourceManager = new MockRM(conf);
|
resourceManager = new MockRM(conf);
|
||||||
resourceManager.start();
|
resourceManager.start();
|
||||||
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
|
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
|
||||||
|
for(FSQueue queue: scheduler.getQueueManager().getQueues()) {
|
||||||
|
assertEquals(queue.getMetrics().getMaxApps(), Integer.MAX_VALUE);
|
||||||
|
assertEquals(queue.getMetrics().getSchedulingPolicy(),
|
||||||
|
SchedulingPolicy.DEFAULT_POLICY.getName());
|
||||||
|
}
|
||||||
|
|
||||||
// Add one big node (only care about aggregate capacity)
|
// Add one big node (only care about aggregate capacity)
|
||||||
RMNode node1 =
|
RMNode node1 =
|
||||||
|
Loading…
x
Reference in New Issue
Block a user