From f24c842d52e166e8566337ef93c96438f1c870d8 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Fri, 25 May 2018 21:53:20 -0700 Subject: [PATCH] YARN-8213. Add Capacity Scheduler performance metrics. (Weiwei Yang via wangda) Change-Id: Ieea6f3eeb83c90cd74233fea896f0fcd0f325d5f --- .../resourcemanager/ResourceManager.java | 1 + .../scheduler/AbstractYarnScheduler.java | 5 + .../scheduler/ResourceScheduler.java | 5 + .../scheduler/capacity/CapacityScheduler.java | 31 ++++- .../capacity/CapacitySchedulerMetrics.java | 119 ++++++++++++++++++ .../TestCapacitySchedulerMetrics.java | 110 ++++++++++++++++ 6 files changed, 269 insertions(+), 2 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 05745ec272e..c53311127c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1216,6 +1216,7 @@ public class ResourceManager extends CompositeService implements Recoverable { void reinitialize(boolean initialize) { ClusterMetrics.destroy(); QueueMetrics.clearQueueMetrics(); + getResourceScheduler().resetSchedulerMetrics(); if (initialize) { resetRMContext(); createAndInitActiveServices(true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index b2747f71ada..18c7b4eb7d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1464,4 +1464,9 @@ public abstract class AbstractYarnScheduler SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) { return false; } + + @Override + public void resetSchedulerMetrics() { + // reset scheduler metrics + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java index 5a56ac7eec2..dcb6edd3ba3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java @@ -71,4 +71,9 @@ public interface ResourceScheduler extends YarnScheduler, Recoverable { */ boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt, SchedulingRequest schedulingRequest, SchedulerNode schedulerNode); + + /** + * Reset scheduler metrics. + */ + void resetSchedulerMetrics(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 162d3bb99cf..1c9bf6bb6e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1252,6 +1252,7 @@ public class CapacityScheduler extends @Override protected void nodeUpdate(RMNode rmNode) { + long begin = System.nanoTime(); try { readLock.lock(); setLastNodeUpdateTime(Time.now()); @@ -1279,6 +1280,9 @@ public class CapacityScheduler extends writeLock.unlock(); } } + + long latency = System.nanoTime() - begin; + CapacitySchedulerMetrics.getMetrics().addNodeUpdate(latency); } /** @@ -1643,17 +1647,28 @@ public class CapacityScheduler extends return null; } + long startTime = System.nanoTime(); + // Backward compatible way to make sure previous behavior which allocation // driven by node heartbeat works. FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); // We have two different logics to handle allocation on single node / multi // nodes. + CSAssignment assignment; if (null != node) { - return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); + assignment = allocateContainerOnSingleNode(candidates, + node, withNodeHeartbeat); } else{ - return allocateContainersOnMultiNodes(candidates); + assignment = allocateContainersOnMultiNodes(candidates); } + + if (assignment != null && assignment.getAssignmentInformation() != null + && assignment.getAssignmentInformation().getNumAllocations() > 0) { + long allocateTime = System.nanoTime() - startTime; + CapacitySchedulerMetrics.getMetrics().addAllocate(allocateTime); + } + return assignment; } @Override @@ -2806,6 +2821,7 @@ public class CapacityScheduler extends @Override public boolean tryCommit(Resource cluster, ResourceCommitRequest r, boolean updatePending) { + long commitStart = System.nanoTime(); ResourceCommitRequest request = (ResourceCommitRequest) r; @@ -2844,9 +2860,15 @@ public class CapacityScheduler extends if (app != null && attemptId.equals(app.getApplicationAttemptId())) { if (app.accept(cluster, request, updatePending) && app.apply(cluster, request, updatePending)) { + long commitSuccess = System.nanoTime() - commitStart; + CapacitySchedulerMetrics.getMetrics() + .addCommitSuccess(commitSuccess); LOG.info("Allocation proposal accepted"); isSuccess = true; } else{ + long commitFailed = System.nanoTime() - commitStart; + CapacitySchedulerMetrics.getMetrics() + .addCommitFailure(commitFailed); LOG.info("Failed to accept allocation proposal"); } @@ -3029,4 +3051,9 @@ public class CapacityScheduler extends } return autoCreatedLeafQueue; } + + @Override + public void resetSchedulerMetrics() { + CapacitySchedulerMetrics.destroy(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java new file mode 100644 index 00000000000..5f8988b0778 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Metrics for capacity scheduler. + */ +@InterfaceAudience.Private +@Metrics(context="yarn") +public class CapacitySchedulerMetrics { + + private static AtomicBoolean isInitialized = new AtomicBoolean(false); + + private static final MetricsInfo RECORD_INFO = + info("CapacitySchedulerMetrics", + "Metrics for the Yarn Capacity Scheduler"); + + @Metric("Scheduler allocate containers") MutableRate allocate; + @Metric("Scheduler commit success") MutableRate commitSuccess; + @Metric("Scheduler commit failure") MutableRate commitFailure; + @Metric("Scheduler node update") MutableRate nodeUpdate; + + private static volatile CapacitySchedulerMetrics INSTANCE = null; + private static MetricsRegistry registry; + + public static CapacitySchedulerMetrics getMetrics() { + if(!isInitialized.get()){ + synchronized (CapacitySchedulerMetrics.class) { + if(INSTANCE == null){ + INSTANCE = new CapacitySchedulerMetrics(); + registerMetrics(); + isInitialized.set(true); + } + } + } + return INSTANCE; + } + + private static void registerMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "ResourceManager"); + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register("CapacitySchedulerMetrics", + "Metrics for the Yarn Capacity Scheduler", INSTANCE); + } + } + + @VisibleForTesting + public synchronized static void destroy() { + isInitialized.set(false); + INSTANCE = null; + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.unregisterSource("CapacitySchedulerMetrics"); + } + } + + public void addAllocate(long latency) { + this.allocate.add(latency); + } + + public void addCommitSuccess(long latency) { + this.commitSuccess.add(latency); + } + + public void addCommitFailure(long latency) { + this.commitFailure.add(latency); + } + + public void addNodeUpdate(long latency) { + this.nodeUpdate.add(latency); + } + + @VisibleForTesting + public long getNumOfNodeUpdate() { + return this.nodeUpdate.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumOfAllocates() { + return this.allocate.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumOfCommitSuccess() { + return this.commitSuccess.lastStat().numSamples(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java new file mode 100644 index 00000000000..eaa966afac3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerMetrics; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.concurrent.TimeoutException; + +/** + * Test class for CS metrics. + */ +public class TestCapacitySchedulerMetrics { + + private MockRM rm; + + @Test + public void testCSMetrics() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true); + + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + rm = new MockRM(conf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 2048); + MockNM nm2 = rm.registerNode("host2:1234", 2048); + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + CapacitySchedulerMetrics csMetrics = CapacitySchedulerMetrics.getMetrics(); + Assert.assertNotNull(csMetrics); + try { + GenericTestUtils.waitFor(() + -> csMetrics.getNumOfNodeUpdate() == 2, 100, 3000); + } catch(TimeoutException e) { + Assert.fail("CS metrics not updated on node-update events."); + } + + Assert.assertEquals(0, csMetrics.getNumOfAllocates()); + Assert.assertEquals(0, csMetrics.getNumOfCommitSuccess()); + + RMApp rmApp = rm.submitApp(1024, "app", "user", null, false, + "default", 1, null, null, false); + MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm); + am.registerAppAttempt(); + am.allocate("*", 1024, 1, new ArrayList<>()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + // Verify HB metrics updated + try { + GenericTestUtils.waitFor(() + -> csMetrics.getNumOfNodeUpdate() == 4, 100, 3000); + } catch(TimeoutException e) { + Assert.fail("CS metrics not updated on node-update events."); + } + + // For async mode, the number of alloc might be bigger than 1 + Assert.assertTrue(csMetrics.getNumOfAllocates() > 0); + // But there will be only 2 successful commit (1 AM + 1 task) + Assert.assertEquals(2, csMetrics.getNumOfCommitSuccess()); + } + + @After + public void tearDown() { + if (rm != null) { + rm.stop(); + } + } +}