YARN-10541. capture the performance metrics of ZKRMStateStore (#2568)
(cherry picked from commit fa4cf91b57
)
This commit is contained in:
parent
f6b9f82b3f
commit
cd5ee0014f
|
@ -19,6 +19,8 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.yarn.util.Clock;
|
||||||
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
|
@ -234,6 +236,10 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
/** Manager for the ZooKeeper connection. */
|
/** Manager for the ZooKeeper connection. */
|
||||||
private ZKCuratorManager zkManager;
|
private ZKCuratorManager zkManager;
|
||||||
|
|
||||||
|
private volatile Clock clock = SystemClock.getInstance();
|
||||||
|
@VisibleForTesting
|
||||||
|
protected ZKRMStateStoreOpDurations opDurations;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Indicates different app attempt state store operations.
|
* Indicates different app attempt state store operations.
|
||||||
*/
|
*/
|
||||||
|
@ -329,6 +335,8 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
|
appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
opDurations = ZKRMStateStoreOpDurations.getInstance();
|
||||||
|
|
||||||
zkAcl = ZKCuratorManager.getZKAcls(conf);
|
zkAcl = ZKCuratorManager.getZKAcls(conf);
|
||||||
|
|
||||||
if (HAUtil.isHAEnabled(conf)) {
|
if (HAUtil.isHAEnabled(conf)) {
|
||||||
|
@ -518,6 +526,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized RMState loadState() throws Exception {
|
public synchronized RMState loadState() throws Exception {
|
||||||
|
long start = clock.getTime();
|
||||||
RMState rmState = new RMState();
|
RMState rmState = new RMState();
|
||||||
// recover DelegationTokenSecretManager
|
// recover DelegationTokenSecretManager
|
||||||
loadRMDTSecretManagerState(rmState);
|
loadRMDTSecretManagerState(rmState);
|
||||||
|
@ -529,6 +538,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
loadReservationSystemState(rmState);
|
loadReservationSystemState(rmState);
|
||||||
// recover ProxyCAManager state
|
// recover ProxyCAManager state
|
||||||
loadProxyCAManagerState(rmState);
|
loadProxyCAManagerState(rmState);
|
||||||
|
opDurations.addLoadStateCallDuration(clock.getTime() - start);
|
||||||
return rmState;
|
return rmState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -834,6 +844,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeApplicationStateInternal(ApplicationId appId,
|
public synchronized void storeApplicationStateInternal(ApplicationId appId,
|
||||||
ApplicationStateData appStateDataPB) throws Exception {
|
ApplicationStateData appStateDataPB) throws Exception {
|
||||||
|
long start = clock.getTime();
|
||||||
String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true);
|
String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true);
|
||||||
|
|
||||||
LOG.debug("Storing info for app: {} at: {}", appId, nodeCreatePath);
|
LOG.debug("Storing info for app: {} at: {}", appId, nodeCreatePath);
|
||||||
|
@ -850,12 +861,14 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
+ " exceeds the maximum allowed size for application data. "
|
+ " exceeds the maximum allowed size for application data. "
|
||||||
+ "See yarn.resourcemanager.zk-max-znode-size.bytes.");
|
+ "See yarn.resourcemanager.zk-max-znode-size.bytes.");
|
||||||
}
|
}
|
||||||
|
opDurations.addStoreApplicationStateCallDuration(clock.getTime() - start);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void updateApplicationStateInternal(
|
protected synchronized void updateApplicationStateInternal(
|
||||||
ApplicationId appId, ApplicationStateData appStateDataPB)
|
ApplicationId appId, ApplicationStateData appStateDataPB)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
long start = clock.getTime();
|
||||||
String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), false);
|
String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), false);
|
||||||
boolean pathExists = true;
|
boolean pathExists = true;
|
||||||
// Look for paths based on other split indices if path as per split index
|
// Look for paths based on other split indices if path as per split index
|
||||||
|
@ -892,6 +905,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
LOG.debug("Path {} for {} didn't exist. Creating a new znode to update"
|
LOG.debug("Path {} for {} didn't exist. Creating a new znode to update"
|
||||||
+ " the application state.", nodeUpdatePath, appId);
|
+ " the application state.", nodeUpdatePath, appId);
|
||||||
}
|
}
|
||||||
|
opDurations.addUpdateApplicationStateCallDuration(clock.getTime() - start);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -976,8 +990,10 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void removeApplicationStateInternal(
|
protected synchronized void removeApplicationStateInternal(
|
||||||
ApplicationStateData appState) throws Exception {
|
ApplicationStateData appState) throws Exception {
|
||||||
|
long start = clock.getTime();
|
||||||
removeApp(appState.getApplicationSubmissionContext().
|
removeApp(appState.getApplicationSubmissionContext().
|
||||||
getApplicationId().toString(), true, appState.attempts.keySet());
|
getApplicationId().toString(), true, appState.attempts.keySet());
|
||||||
|
opDurations.addRemoveApplicationStateCallDuration(clock.getTime() - start);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeApp(String removeAppId) throws Exception {
|
private void removeApp(String removeAppId) throws Exception {
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSource;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to capture the performance metrics of ZKRMStateStore.
|
||||||
|
* This should be a singleton.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
@Metrics(context="ZKRMStateStore-op-durations")
|
||||||
|
public final class ZKRMStateStoreOpDurations implements MetricsSource {
|
||||||
|
|
||||||
|
@Metric("Duration for a load state call")
|
||||||
|
MutableRate loadStateCall;
|
||||||
|
|
||||||
|
@Metric("Duration for a store application state call")
|
||||||
|
MutableRate storeApplicationStateCall;
|
||||||
|
|
||||||
|
@Metric("Duration for a update application state call")
|
||||||
|
MutableRate updateApplicationStateCall;
|
||||||
|
|
||||||
|
@Metric("Duration to handle a remove application state call")
|
||||||
|
MutableRate removeApplicationStateCall;
|
||||||
|
|
||||||
|
protected static final MetricsInfo RECORD_INFO =
|
||||||
|
info("ZKRMStateStoreOpDurations", "Durations of ZKRMStateStore calls");
|
||||||
|
|
||||||
|
private final MetricsRegistry registry;
|
||||||
|
|
||||||
|
private static final ZKRMStateStoreOpDurations INSTANCE
|
||||||
|
= new ZKRMStateStoreOpDurations();
|
||||||
|
|
||||||
|
public static ZKRMStateStoreOpDurations getInstance() {
|
||||||
|
return INSTANCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ZKRMStateStoreOpDurations() {
|
||||||
|
registry = new MetricsRegistry(RECORD_INFO);
|
||||||
|
registry.tag(RECORD_INFO, "ZKRMStateStoreOpDurations");
|
||||||
|
|
||||||
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
|
if (ms != null) {
|
||||||
|
ms.register(RECORD_INFO.name(), RECORD_INFO.description(), this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void getMetrics(MetricsCollector collector, boolean all) {
|
||||||
|
registry.snapshot(collector.addRecord(registry.info()), all);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addLoadStateCallDuration(long value) {
|
||||||
|
loadStateCall.add(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addStoreApplicationStateCallDuration(long value) {
|
||||||
|
storeApplicationStateCall.add(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addUpdateApplicationStateCallDuration(long value) {
|
||||||
|
updateApplicationStateCall.add(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addRemoveApplicationStateCallDuration(long value) {
|
||||||
|
removeApplicationStateCall.add(value);
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,6 +22,9 @@ import org.apache.curator.framework.CuratorFramework;
|
||||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||||
import org.apache.curator.retry.RetryNTimes;
|
import org.apache.curator.retry.RetryNTimes;
|
||||||
import org.apache.curator.test.TestingServer;
|
import org.apache.curator.test.TestingServer;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||||
|
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
||||||
|
import org.apache.hadoop.metrics2.impl.MetricsRecords;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -1567,4 +1570,40 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
Collections.emptyMap(), ctx.getApplicationSchedulingPropertiesMap());
|
Collections.emptyMap(), ctx.getApplicationSchedulingPropertiesMap());
|
||||||
store.close();
|
store.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetricsInited() throws Exception {
|
||||||
|
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||||
|
Configuration conf = createConfForDelegationTokenNodeSplit(1);
|
||||||
|
MetricsCollectorImpl collector = new MetricsCollectorImpl();
|
||||||
|
ZKRMStateStoreOpDurations opDurations =
|
||||||
|
((ZKRMStateStore)zkTester.getRMStateStore(conf)).opDurations;
|
||||||
|
|
||||||
|
long anyDuration = 10;
|
||||||
|
opDurations.addLoadStateCallDuration(anyDuration);
|
||||||
|
opDurations.addStoreApplicationStateCallDuration(anyDuration);
|
||||||
|
opDurations.addUpdateApplicationStateCallDuration(anyDuration);
|
||||||
|
opDurations.addRemoveApplicationStateCallDuration(anyDuration);
|
||||||
|
|
||||||
|
Thread.sleep(110);
|
||||||
|
|
||||||
|
opDurations.getMetrics(collector, true);
|
||||||
|
assertEquals("Incorrect number of perf metrics", 1,
|
||||||
|
collector.getRecords().size());
|
||||||
|
MetricsRecord record = collector.getRecords().get(0);
|
||||||
|
MetricsRecords.assertTag(record,
|
||||||
|
ZKRMStateStoreOpDurations.RECORD_INFO.name(),
|
||||||
|
"ZKRMStateStoreOpDurations");
|
||||||
|
|
||||||
|
double expectAvgTime = anyDuration;
|
||||||
|
MetricsRecords.assertMetric(record,
|
||||||
|
"LoadStateCallAvgTime", expectAvgTime);
|
||||||
|
MetricsRecords.assertMetric(record,
|
||||||
|
"StoreApplicationStateCallAvgTime", expectAvgTime);
|
||||||
|
MetricsRecords.assertMetric(record,
|
||||||
|
"UpdateApplicationStateCallAvgTime", expectAvgTime);
|
||||||
|
MetricsRecords.assertMetric(record,
|
||||||
|
"RemoveApplicationStateCallAvgTime", expectAvgTime);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue