YARN-5929. Missing scheduling policy in the FS queue metric. (Contributed by Yufei Gu via Daniel Templeton)
This commit is contained in:
parent
2ff84a0040
commit
5bd18c49bd
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
|
@ -169,6 +170,12 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
amResourceUsageVCores.set(resource.getVirtualCores());
|
amResourceUsageVCores.set(resource.getVirtualCores());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the scheduling policy.
|
||||||
|
*
|
||||||
|
* @return the scheduling policy
|
||||||
|
*/
|
||||||
|
@Metric("Scheduling policy")
|
||||||
public String getSchedulingPolicy() {
|
public String getSchedulingPolicy() {
|
||||||
return schedulingPolicy;
|
return schedulingPolicy;
|
||||||
}
|
}
|
||||||
|
@ -181,6 +188,24 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
static FSQueueMetrics forQueue(String queueName, Queue parent,
|
static FSQueueMetrics forQueue(String queueName, Queue parent,
|
||||||
boolean enableUserMetrics, Configuration conf) {
|
boolean enableUserMetrics, Configuration conf) {
|
||||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
|
return forQueue(ms, queueName, parent, enableUserMetrics, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the FS queue metric for the given queue. Create one and register it to
|
||||||
|
* metrics system if there isn't one for the queue.
|
||||||
|
*
|
||||||
|
* @param ms the metric system
|
||||||
|
* @param queueName queue name
|
||||||
|
* @param parent parent queue
|
||||||
|
* @param enableUserMetrics if user metrics is needed
|
||||||
|
* @param conf configuration
|
||||||
|
* @return a FSQueueMetrics object
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public synchronized
|
||||||
|
static FSQueueMetrics forQueue(MetricsSystem ms, String queueName,
|
||||||
|
Queue parent, boolean enableUserMetrics, Configuration conf) {
|
||||||
QueueMetrics metrics = queueMetrics.get(queueName);
|
QueueMetrics metrics = queueMetrics.get(queueName);
|
||||||
if (metrics == null) {
|
if (metrics == null) {
|
||||||
metrics = new FSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
|
metrics = new FSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
|
||||||
|
@ -189,13 +214,12 @@ public class FSQueueMetrics extends QueueMetrics {
|
||||||
// Register with the MetricsSystems
|
// Register with the MetricsSystems
|
||||||
if (ms != null) {
|
if (ms != null) {
|
||||||
metrics = ms.register(
|
metrics = ms.register(
|
||||||
sourceName(queueName).toString(),
|
sourceName(queueName).toString(),
|
||||||
"Metrics for queue: " + queueName, metrics);
|
"Metrics for queue: " + queueName, metrics);
|
||||||
}
|
}
|
||||||
queueMetrics.put(queueName, metrics);
|
queueMetrics.put(queueName, metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
return (FSQueueMetrics)metrics;
|
return (FSQueueMetrics)metrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSource;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
||||||
|
import org.apache.hadoop.metrics2.impl.MetricsRecords;
|
||||||
|
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The test class for {@link FSQueueMetrics}.
|
||||||
|
*/
|
||||||
|
public class TestFSQueueMetrics {
|
||||||
|
private static final Configuration CONF = new Configuration();
|
||||||
|
|
||||||
|
private MetricsSystem ms;
|
||||||
|
|
||||||
|
@Before public void setUp() {
|
||||||
|
ms = new MetricsSystemImpl();
|
||||||
|
QueueMetrics.clearQueueMetrics();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if the metric scheduling policy is set correctly.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSchedulingPolicy() {
|
||||||
|
String queueName = "single";
|
||||||
|
|
||||||
|
FSQueueMetrics metrics = FSQueueMetrics.forQueue(ms, queueName, null, false,
|
||||||
|
CONF);
|
||||||
|
metrics.setSchedulingPolicy("drf");
|
||||||
|
checkSchedulingPolicy(queueName, "drf");
|
||||||
|
|
||||||
|
// test resetting the scheduling policy
|
||||||
|
metrics.setSchedulingPolicy("fair");
|
||||||
|
checkSchedulingPolicy(queueName, "fair");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkSchedulingPolicy(String queueName, String policy) {
|
||||||
|
MetricsSource queueSource = TestQueueMetrics.queueSource(ms, queueName);
|
||||||
|
MetricsCollectorImpl collector = new MetricsCollectorImpl();
|
||||||
|
queueSource.getMetrics(collector, true);
|
||||||
|
MetricsRecords.assertTag(collector.getRecords().get(0), "SchedulingPolicy",
|
||||||
|
policy);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue