YARN-8213. Add Capacity Scheduler performance metrics. (Weiwei Yang via wangda)

Change-Id: Ieea6f3eeb83c90cd74233fea896f0fcd0f325d5f
(cherry picked from commit f24c842d52)
This commit is contained in:
Wangda Tan 2018-05-25 21:53:20 -07:00
parent c7d77a3bfe
commit 4d41cb1696
6 changed files with 269 additions and 2 deletions

View File

@ -1216,6 +1216,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
void reinitialize(boolean initialize) {
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
getResourceScheduler().resetSchedulerMetrics();
if (initialize) {
resetRMContext();
createAndInitActiveServices(true);

View File

@ -1464,4 +1464,9 @@ public abstract class AbstractYarnScheduler
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) {
return false;
}
@Override
public void resetSchedulerMetrics() {
// reset scheduler metrics
}
}

View File

@ -71,4 +71,9 @@ public interface ResourceScheduler extends YarnScheduler, Recoverable {
*/
boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode);
/**
* Reset scheduler metrics.
*/
void resetSchedulerMetrics();
}

View File

@ -1215,6 +1215,7 @@ public class CapacityScheduler extends
@Override
protected void nodeUpdate(RMNode rmNode) {
long begin = System.nanoTime();
try {
readLock.lock();
setLastNodeUpdateTime(Time.now());
@ -1242,6 +1243,9 @@ public class CapacityScheduler extends
writeLock.unlock();
}
}
long latency = System.nanoTime() - begin;
CapacitySchedulerMetrics.getMetrics().addNodeUpdate(latency);
}
/**
@ -1606,17 +1610,28 @@ public class CapacityScheduler extends
return null;
}
long startTime = System.nanoTime();
// Backward compatible way to make sure previous behavior which allocation
// driven by node heartbeat works.
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
// We have two different logics to handle allocation on single node / multi
// nodes.
CSAssignment assignment;
if (null != node) {
return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat);
assignment = allocateContainerOnSingleNode(candidates,
node, withNodeHeartbeat);
} else{
return allocateContainersOnMultiNodes(candidates);
assignment = allocateContainersOnMultiNodes(candidates);
}
if (assignment != null && assignment.getAssignmentInformation() != null
&& assignment.getAssignmentInformation().getNumAllocations() > 0) {
long allocateTime = System.nanoTime() - startTime;
CapacitySchedulerMetrics.getMetrics().addAllocate(allocateTime);
}
return assignment;
}
@Override
@ -2769,6 +2784,7 @@ public class CapacityScheduler extends
@Override
public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
boolean updatePending) {
long commitStart = System.nanoTime();
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
(ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>) r;
@ -2807,9 +2823,15 @@ public class CapacityScheduler extends
if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
if (app.accept(cluster, request, updatePending)
&& app.apply(cluster, request, updatePending)) {
long commitSuccess = System.nanoTime() - commitStart;
CapacitySchedulerMetrics.getMetrics()
.addCommitSuccess(commitSuccess);
LOG.info("Allocation proposal accepted");
isSuccess = true;
} else{
long commitFailed = System.nanoTime() - commitStart;
CapacitySchedulerMetrics.getMetrics()
.addCommitFailure(commitFailed);
LOG.info("Failed to accept allocation proposal");
}
@ -2992,4 +3014,9 @@ public class CapacityScheduler extends
}
return autoCreatedLeafQueue;
}
@Override
public void resetSchedulerMetrics() {
CapacitySchedulerMetrics.destroy();
}
}

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableRate;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.metrics2.lib.Interns.info;
/**
* Metrics for capacity scheduler.
*/
@InterfaceAudience.Private
@Metrics(context="yarn")
public class CapacitySchedulerMetrics {
private static AtomicBoolean isInitialized = new AtomicBoolean(false);
private static final MetricsInfo RECORD_INFO =
info("CapacitySchedulerMetrics",
"Metrics for the Yarn Capacity Scheduler");
@Metric("Scheduler allocate containers") MutableRate allocate;
@Metric("Scheduler commit success") MutableRate commitSuccess;
@Metric("Scheduler commit failure") MutableRate commitFailure;
@Metric("Scheduler node update") MutableRate nodeUpdate;
private static volatile CapacitySchedulerMetrics INSTANCE = null;
private static MetricsRegistry registry;
public static CapacitySchedulerMetrics getMetrics() {
if(!isInitialized.get()){
synchronized (CapacitySchedulerMetrics.class) {
if(INSTANCE == null){
INSTANCE = new CapacitySchedulerMetrics();
registerMetrics();
isInitialized.set(true);
}
}
}
return INSTANCE;
}
private static void registerMetrics() {
registry = new MetricsRegistry(RECORD_INFO);
registry.tag(RECORD_INFO, "ResourceManager");
MetricsSystem ms = DefaultMetricsSystem.instance();
if (ms != null) {
ms.register("CapacitySchedulerMetrics",
"Metrics for the Yarn Capacity Scheduler", INSTANCE);
}
}
@VisibleForTesting
public synchronized static void destroy() {
isInitialized.set(false);
INSTANCE = null;
MetricsSystem ms = DefaultMetricsSystem.instance();
if (ms != null) {
ms.unregisterSource("CapacitySchedulerMetrics");
}
}
public void addAllocate(long latency) {
this.allocate.add(latency);
}
public void addCommitSuccess(long latency) {
this.commitSuccess.add(latency);
}
public void addCommitFailure(long latency) {
this.commitFailure.add(latency);
}
public void addNodeUpdate(long latency) {
this.nodeUpdate.add(latency);
}
@VisibleForTesting
public long getNumOfNodeUpdate() {
return this.nodeUpdate.lastStat().numSamples();
}
@VisibleForTesting
public long getNumOfAllocates() {
return this.allocate.lastStat().numSamples();
}
@VisibleForTesting
public long getNumOfCommitSuccess() {
return this.commitSuccess.lastStat().numSamples();
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerMetrics;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.concurrent.TimeoutException;
/**
* Test class for CS metrics.
*/
public class TestCapacitySchedulerMetrics {
private MockRM rm;
@Test
public void testCSMetrics() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
rm = new MockRM(conf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 2048);
MockNM nm2 = rm.registerNode("host2:1234", 2048);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
CapacitySchedulerMetrics csMetrics = CapacitySchedulerMetrics.getMetrics();
Assert.assertNotNull(csMetrics);
try {
GenericTestUtils.waitFor(()
-> csMetrics.getNumOfNodeUpdate() == 2, 100, 3000);
} catch(TimeoutException e) {
Assert.fail("CS metrics not updated on node-update events.");
}
Assert.assertEquals(0, csMetrics.getNumOfAllocates());
Assert.assertEquals(0, csMetrics.getNumOfCommitSuccess());
RMApp rmApp = rm.submitApp(1024, "app", "user", null, false,
"default", 1, null, null, false);
MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
am.registerAppAttempt();
am.allocate("*", 1024, 1, new ArrayList<>());
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
// Verify HB metrics updated
try {
GenericTestUtils.waitFor(()
-> csMetrics.getNumOfNodeUpdate() == 4, 100, 3000);
} catch(TimeoutException e) {
Assert.fail("CS metrics not updated on node-update events.");
}
// For async mode, the number of alloc might be bigger than 1
Assert.assertTrue(csMetrics.getNumOfAllocates() > 0);
// But there will be only 2 successful commit (1 AM + 1 task)
Assert.assertEquals(2, csMetrics.getNumOfCommitSuccess());
}
@After
public void tearDown() {
if (rm != null) {
rm.stop();
}
}
}