YARN-8213. Add Capacity Scheduler performance metrics. (Weiwei Yang via wangda)
Change-Id: Ieea6f3eeb83c90cd74233fea896f0fcd0f325d5f
This commit is contained in:
parent
8605a38514
commit
f24c842d52
|
@ -1216,6 +1216,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
void reinitialize(boolean initialize) {
|
void reinitialize(boolean initialize) {
|
||||||
ClusterMetrics.destroy();
|
ClusterMetrics.destroy();
|
||||||
QueueMetrics.clearQueueMetrics();
|
QueueMetrics.clearQueueMetrics();
|
||||||
|
getResourceScheduler().resetSchedulerMetrics();
|
||||||
if (initialize) {
|
if (initialize) {
|
||||||
resetRMContext();
|
resetRMContext();
|
||||||
createAndInitActiveServices(true);
|
createAndInitActiveServices(true);
|
||||||
|
|
|
@ -1464,4 +1464,9 @@ public abstract class AbstractYarnScheduler
|
||||||
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) {
|
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void resetSchedulerMetrics() {
|
||||||
|
// reset scheduler metrics
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,4 +71,9 @@ public interface ResourceScheduler extends YarnScheduler, Recoverable {
|
||||||
*/
|
*/
|
||||||
boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
|
boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
|
||||||
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode);
|
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset scheduler metrics.
|
||||||
|
*/
|
||||||
|
void resetSchedulerMetrics();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1252,6 +1252,7 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void nodeUpdate(RMNode rmNode) {
|
protected void nodeUpdate(RMNode rmNode) {
|
||||||
|
long begin = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
setLastNodeUpdateTime(Time.now());
|
setLastNodeUpdateTime(Time.now());
|
||||||
|
@ -1279,6 +1280,9 @@ public class CapacityScheduler extends
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long latency = System.nanoTime() - begin;
|
||||||
|
CapacitySchedulerMetrics.getMetrics().addNodeUpdate(latency);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1643,17 +1647,28 @@ public class CapacityScheduler extends
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long startTime = System.nanoTime();
|
||||||
|
|
||||||
// Backward compatible way to make sure previous behavior which allocation
|
// Backward compatible way to make sure previous behavior which allocation
|
||||||
// driven by node heartbeat works.
|
// driven by node heartbeat works.
|
||||||
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
|
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
|
||||||
|
|
||||||
// We have two different logics to handle allocation on single node / multi
|
// We have two different logics to handle allocation on single node / multi
|
||||||
// nodes.
|
// nodes.
|
||||||
|
CSAssignment assignment;
|
||||||
if (null != node) {
|
if (null != node) {
|
||||||
return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat);
|
assignment = allocateContainerOnSingleNode(candidates,
|
||||||
|
node, withNodeHeartbeat);
|
||||||
} else{
|
} else{
|
||||||
return allocateContainersOnMultiNodes(candidates);
|
assignment = allocateContainersOnMultiNodes(candidates);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (assignment != null && assignment.getAssignmentInformation() != null
|
||||||
|
&& assignment.getAssignmentInformation().getNumAllocations() > 0) {
|
||||||
|
long allocateTime = System.nanoTime() - startTime;
|
||||||
|
CapacitySchedulerMetrics.getMetrics().addAllocate(allocateTime);
|
||||||
|
}
|
||||||
|
return assignment;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2806,6 +2821,7 @@ public class CapacityScheduler extends
|
||||||
@Override
|
@Override
|
||||||
public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
|
public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
|
||||||
boolean updatePending) {
|
boolean updatePending) {
|
||||||
|
long commitStart = System.nanoTime();
|
||||||
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
|
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
|
||||||
(ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>) r;
|
(ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>) r;
|
||||||
|
|
||||||
|
@ -2844,9 +2860,15 @@ public class CapacityScheduler extends
|
||||||
if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
|
if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
|
||||||
if (app.accept(cluster, request, updatePending)
|
if (app.accept(cluster, request, updatePending)
|
||||||
&& app.apply(cluster, request, updatePending)) {
|
&& app.apply(cluster, request, updatePending)) {
|
||||||
|
long commitSuccess = System.nanoTime() - commitStart;
|
||||||
|
CapacitySchedulerMetrics.getMetrics()
|
||||||
|
.addCommitSuccess(commitSuccess);
|
||||||
LOG.info("Allocation proposal accepted");
|
LOG.info("Allocation proposal accepted");
|
||||||
isSuccess = true;
|
isSuccess = true;
|
||||||
} else{
|
} else{
|
||||||
|
long commitFailed = System.nanoTime() - commitStart;
|
||||||
|
CapacitySchedulerMetrics.getMetrics()
|
||||||
|
.addCommitFailure(commitFailed);
|
||||||
LOG.info("Failed to accept allocation proposal");
|
LOG.info("Failed to accept allocation proposal");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3029,4 +3051,9 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
return autoCreatedLeafQueue;
|
return autoCreatedLeafQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void resetSchedulerMetrics() {
|
||||||
|
CapacitySchedulerMetrics.destroy();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,119 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Metrics for capacity scheduler.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@Metrics(context="yarn")
|
||||||
|
public class CapacitySchedulerMetrics {
|
||||||
|
|
||||||
|
private static AtomicBoolean isInitialized = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private static final MetricsInfo RECORD_INFO =
|
||||||
|
info("CapacitySchedulerMetrics",
|
||||||
|
"Metrics for the Yarn Capacity Scheduler");
|
||||||
|
|
||||||
|
@Metric("Scheduler allocate containers") MutableRate allocate;
|
||||||
|
@Metric("Scheduler commit success") MutableRate commitSuccess;
|
||||||
|
@Metric("Scheduler commit failure") MutableRate commitFailure;
|
||||||
|
@Metric("Scheduler node update") MutableRate nodeUpdate;
|
||||||
|
|
||||||
|
private static volatile CapacitySchedulerMetrics INSTANCE = null;
|
||||||
|
private static MetricsRegistry registry;
|
||||||
|
|
||||||
|
public static CapacitySchedulerMetrics getMetrics() {
|
||||||
|
if(!isInitialized.get()){
|
||||||
|
synchronized (CapacitySchedulerMetrics.class) {
|
||||||
|
if(INSTANCE == null){
|
||||||
|
INSTANCE = new CapacitySchedulerMetrics();
|
||||||
|
registerMetrics();
|
||||||
|
isInitialized.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return INSTANCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void registerMetrics() {
|
||||||
|
registry = new MetricsRegistry(RECORD_INFO);
|
||||||
|
registry.tag(RECORD_INFO, "ResourceManager");
|
||||||
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
|
if (ms != null) {
|
||||||
|
ms.register("CapacitySchedulerMetrics",
|
||||||
|
"Metrics for the Yarn Capacity Scheduler", INSTANCE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public synchronized static void destroy() {
|
||||||
|
isInitialized.set(false);
|
||||||
|
INSTANCE = null;
|
||||||
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
|
if (ms != null) {
|
||||||
|
ms.unregisterSource("CapacitySchedulerMetrics");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addAllocate(long latency) {
|
||||||
|
this.allocate.add(latency);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addCommitSuccess(long latency) {
|
||||||
|
this.commitSuccess.add(latency);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addCommitFailure(long latency) {
|
||||||
|
this.commitFailure.add(latency);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addNodeUpdate(long latency) {
|
||||||
|
this.nodeUpdate.add(latency);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumOfNodeUpdate() {
|
||||||
|
return this.nodeUpdate.lastStat().numSamples();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumOfAllocates() {
|
||||||
|
return this.allocate.lastStat().numSamples();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumOfCommitSuccess() {
|
||||||
|
return this.commitSuccess.lastStat().numSamples();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,110 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerMetrics;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test class for CS metrics.
|
||||||
|
*/
|
||||||
|
public class TestCapacitySchedulerMetrics {
|
||||||
|
|
||||||
|
private MockRM rm;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCSMetrics() throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
conf.setBoolean(
|
||||||
|
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
|
||||||
|
|
||||||
|
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
||||||
|
mgr.init(conf);
|
||||||
|
rm = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
public RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
rm.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
MockNM nm1 = rm.registerNode("host1:1234", 2048);
|
||||||
|
MockNM nm2 = rm.registerNode("host2:1234", 2048);
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
CapacitySchedulerMetrics csMetrics = CapacitySchedulerMetrics.getMetrics();
|
||||||
|
Assert.assertNotNull(csMetrics);
|
||||||
|
try {
|
||||||
|
GenericTestUtils.waitFor(()
|
||||||
|
-> csMetrics.getNumOfNodeUpdate() == 2, 100, 3000);
|
||||||
|
} catch(TimeoutException e) {
|
||||||
|
Assert.fail("CS metrics not updated on node-update events.");
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(0, csMetrics.getNumOfAllocates());
|
||||||
|
Assert.assertEquals(0, csMetrics.getNumOfCommitSuccess());
|
||||||
|
|
||||||
|
RMApp rmApp = rm.submitApp(1024, "app", "user", null, false,
|
||||||
|
"default", 1, null, null, false);
|
||||||
|
MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
|
||||||
|
am.registerAppAttempt();
|
||||||
|
am.allocate("*", 1024, 1, new ArrayList<>());
|
||||||
|
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
// Verify HB metrics updated
|
||||||
|
try {
|
||||||
|
GenericTestUtils.waitFor(()
|
||||||
|
-> csMetrics.getNumOfNodeUpdate() == 4, 100, 3000);
|
||||||
|
} catch(TimeoutException e) {
|
||||||
|
Assert.fail("CS metrics not updated on node-update events.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// For async mode, the number of alloc might be bigger than 1
|
||||||
|
Assert.assertTrue(csMetrics.getNumOfAllocates() > 0);
|
||||||
|
// But there will be only 2 successful commit (1 AM + 1 task)
|
||||||
|
Assert.assertEquals(2, csMetrics.getNumOfCommitSuccess());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
if (rm != null) {
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue