HDFS-16313. RBF: Add metrics for each sub cluster (#3638)

This commit is contained in:
Symious 2021-11-16 09:47:25 +08:00 committed by GitHub
parent b933f5f54b
commit 89fcbd84f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 344 additions and 19 deletions

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.metrics;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@ -26,6 +28,7 @@ import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
@ -61,6 +64,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
/** JMX interface to monitor the RPC metrics. */
private FederationRPCMetrics metrics;
/** JMX interface to monitor each Nameservice RPC metrics. */
private Map<String, NameserviceRPCMetrics> nameserviceRPCMetricsMap =
new ConcurrentHashMap<>();
private ObjectName registeredBean;
/** Thread pool for logging stats. */
@ -77,6 +83,11 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
// Create metrics
this.metrics = FederationRPCMetrics.create(conf, server);
for (String nameservice : FederationUtil.getAllConfiguredNS(conf)) {
LOG.info("Create Nameservice RPC Metrics for " + nameservice);
this.nameserviceRPCMetricsMap.computeIfAbsent(nameservice,
k -> NameserviceRPCMetrics.create(conf, k));
}
// Create thread pool
ThreadFactory threadFactory = new ThreadFactoryBuilder()
@ -136,27 +147,41 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
}
@Override
public void proxyOpComplete(boolean success) {
public void proxyOpComplete(boolean success, String nsId) {
if (success) {
long proxyTime = getProxyTime();
if (metrics != null && proxyTime >= 0) {
metrics.addProxyTime(proxyTime);
if (proxyTime >= 0) {
if (metrics != null) {
metrics.addProxyTime(proxyTime);
}
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
nameserviceRPCMetricsMap.get(nsId).addProxyTime(proxyTime);
}
}
}
}
@Override
public void proxyOpFailureStandby() {
public void proxyOpFailureStandby(String nsId) {
if (metrics != null) {
metrics.incrProxyOpFailureStandby();
}
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
nameserviceRPCMetricsMap.get(nsId).incrProxyOpFailureStandby();
}
}
@Override
public void proxyOpFailureCommunicate() {
public void proxyOpFailureCommunicate(String nsId) {
if (metrics != null) {
metrics.incrProxyOpFailureCommunicate();
}
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
nameserviceRPCMetricsMap.get(nsId).incrProxyOpFailureCommunicate();
}
}
@Override
@ -181,10 +206,14 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
}
@Override
public void proxyOpNoNamenodes() {
public void proxyOpNoNamenodes(String nsId) {
if (metrics != null) {
metrics.incrProxyOpNoNamenodes();
}
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
nameserviceRPCMetricsMap.get(nsId).incrProxyOpNoNamenodes();
}
}
@Override

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.metrics;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* JMX interface for the RPC server of Nameservice.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface NameserviceRPCMBean {
long getProxyOps();
double getProxyAvg();
long getProxyOpFailureCommunicate();
long getProxyOpFailureStandby();
long getProxyOpNoNamenodes();
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.metrics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import java.util.concurrent.ThreadLocalRandom;
/**
* Implementation of the Nameservice RPC metrics collector.
*/
@Metrics(name = "NameserviceRPCActivity", about = "Nameservice RPC Activity",
context = "dfs")
public class NameserviceRPCMetrics implements NameserviceRPCMBean {
public final static String NAMESERVICE_RPC_METRICS_PREFIX = "NameserviceActivity-";
private final String nsId;
@Metric("Time for the Router to proxy an operation to the Nameservice")
private MutableRate proxy;
@Metric("Number of operations the Router proxied to a NameService")
private MutableCounterLong proxyOp;
@Metric("Number of operations to hit a standby NN")
private MutableCounterLong proxyOpFailureStandby;
@Metric("Number of operations to fail to reach NN")
private MutableCounterLong proxyOpFailureCommunicate;
@Metric("Number of operations to hit no namenodes available")
private MutableCounterLong proxyOpNoNamenodes;
public NameserviceRPCMetrics(Configuration conf, String nsId) {
this.nsId = nsId;
}
public static NameserviceRPCMetrics create(Configuration conf,
String nameService) {
MetricsSystem ms = DefaultMetricsSystem.instance();
String name = NAMESERVICE_RPC_METRICS_PREFIX + (nameService.isEmpty()
? "UndefinedNameService"+ ThreadLocalRandom.current().nextInt()
: nameService);
return ms.register(name, "HDFS Federation NameService RPC Metrics",
new NameserviceRPCMetrics(conf, name));
}
public void incrProxyOpFailureStandby() {
proxyOpFailureStandby.incr();
}
@Override
public long getProxyOpFailureStandby() {
return proxyOpFailureStandby.value();
}
public void incrProxyOpFailureCommunicate() {
proxyOpFailureCommunicate.incr();
}
@Override
public long getProxyOpFailureCommunicate() {
return proxyOpFailureCommunicate.value();
}
public void incrProxyOpNoNamenodes() {
proxyOpNoNamenodes.incr();
}
@Override
public long getProxyOpNoNamenodes() {
return proxyOpNoNamenodes.value();
}
/**
* Add the time to proxy an operation from the moment the Router sends it to
* the Namenode until it replied.
* @param time Proxy time of an operation in nanoseconds.
*/
public void addProxyTime(long time) {
proxy.add(time);
proxyOp.incr();
}
@Override
public double getProxyAvg() {
return proxy.lastStat().mean();
}
@Override
public long getProxyOps() {
return proxyOp.value();
}
public String getNsId() {
return this.nsId;
}
}

View File

@ -489,7 +489,7 @@ public class RouterRpcClient {
namenodeResolver.updateActiveNamenode(nsId, address);
}
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpComplete(true);
this.rpcMonitor.proxyOpComplete(true, nsId);
}
if (this.router.getRouterClientMetrics() != null) {
this.router.getRouterClientMetrics().incInvokedMethod(method);
@ -500,17 +500,17 @@ public class RouterRpcClient {
if (ioe instanceof StandbyException) {
// Fail over indicated by retry policy and/or NN
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpFailureStandby();
this.rpcMonitor.proxyOpFailureStandby(nsId);
}
failover = true;
} else if (isUnavailableException(ioe)) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpFailureCommunicate();
this.rpcMonitor.proxyOpFailureCommunicate(nsId);
}
failover = true;
} else if (ioe instanceof RemoteException) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpComplete(true);
this.rpcMonitor.proxyOpComplete(true, nsId);
}
RemoteException re = (RemoteException) ioe;
ioe = re.unwrapRemoteException();
@ -519,7 +519,7 @@ public class RouterRpcClient {
throw ioe;
} else if (ioe instanceof ConnectionNullException) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpFailureCommunicate();
this.rpcMonitor.proxyOpFailureCommunicate(nsId);
}
LOG.error("Get connection for {} {} error: {}", nsId, rpcAddress,
ioe.getMessage());
@ -529,7 +529,7 @@ public class RouterRpcClient {
throw se;
} else if (ioe instanceof NoNamenodesAvailableException) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpNoNamenodes();
this.rpcMonitor.proxyOpNoNamenodes(nsId);
}
LOG.error("Cannot get available namenode for {} {} error: {}",
nsId, rpcAddress, ioe.getMessage());
@ -539,8 +539,8 @@ public class RouterRpcClient {
// Other communication error, this is a failure
// Communication retries are handled by the retry policy
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpFailureCommunicate();
this.rpcMonitor.proxyOpComplete(false);
this.rpcMonitor.proxyOpFailureCommunicate(nsId);
this.rpcMonitor.proxyOpComplete(false, nsId);
}
throw ioe;
}
@ -551,7 +551,7 @@ public class RouterRpcClient {
}
}
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpComplete(false);
this.rpcMonitor.proxyOpComplete(false, null);
}
// All namenodes were unavailable or in standby

View File

@ -62,18 +62,18 @@ public interface RouterRpcMonitor {
* Mark a proxy operation as completed.
* @param success If the operation was successful.
*/
void proxyOpComplete(boolean success);
void proxyOpComplete(boolean success, String nsId);
/**
* Failed to proxy an operation to a Namenode because it was in standby.
*/
void proxyOpFailureStandby();
void proxyOpFailureStandby(String nsId);
/**
* Failed to proxy an operation to a Namenode because of an unexpected
* exception.
*/
void proxyOpFailureCommunicate();
void proxyOpFailureCommunicate(String nsId);
/**
* Failed to proxy an operation to a Namenode because the client was
@ -95,7 +95,7 @@ public interface RouterRpcMonitor {
/**
* Failed to proxy an operation because of no namenodes available.
*/
void proxyOpNoNamenodes();
void proxyOpNoNamenodes(String nsId);
/**
* If the Router cannot contact the State Store in an operation.

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.metrics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import static org.apache.hadoop.hdfs.server.federation.metrics.NameserviceRPCMetrics.NAMESERVICE_RPC_METRICS_PREFIX;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
/**
* Test case for RouterClientMetrics.
*/
public class TestNameserviceRPCMetrics {
private static final Configuration CONF = new HdfsConfiguration();
static {
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
}
private static final int NUM_SUBCLUSTERS = 2;
private static final int NUM_DNS = 3;
/** Federated HDFS cluster. */
private static MiniRouterDFSCluster cluster;
/** The first Router Context for this federated cluster. */
private MiniRouterDFSCluster.RouterContext routerContext;
/** The first Router for this federated cluster. */
private Router router;
/** Filesystem interface to the Router. */
private FileSystem routerFS;
/** Filesystem interface to the Namenode. */
private FileSystem nnFS;
@BeforeClass
public static void globalSetUp() throws Exception {
cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
cluster.setNumDatanodesPerNameservice(NUM_DNS);
cluster.startCluster();
Configuration routerConf = new RouterConfigBuilder()
.metrics()
.rpc()
.quota()
.build();
cluster.addRouterOverrides(routerConf);
cluster.startRouters();
// Register and verify all NNs with all routers
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();
}
@Before
public void testSetup() throws Exception {
// Create mock locations
cluster.installMockLocations();
// Delete all files via the NNs and verify
cluster.deleteAllFiles();
// Create test fixtures on NN
cluster.createTestDirectoriesNamenode();
// Wait to ensure NN has fully created its test directories
Thread.sleep(100);
routerContext = cluster.getRouters().get(0);
this.routerFS = routerContext.getFileSystem();
// Add extra location to the root mount / such that the root mount points:
// /
// ns0 -> /target-ns0
// ns1 -> /target-ns1
router = routerContext.getRouter();
MockResolver resolver = (MockResolver) router.getSubclusterResolver();
resolver.addLocation("/target-ns0", cluster.getNameservices().get(0), "/target-ns0");
resolver.addLocation("/target-ns1", cluster.getNameservices().get(1), "/target-ns1");
}
@AfterClass
public static void tearDown() throws Exception {
cluster.shutdown();
}
@Test
public void testProxyOp() throws IOException {
routerFS.listStatus(new Path("/target-ns0"));
assertCounter("ProxyOp", 1L,
getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns0"));
assertCounter("ProxyOp", 0L,
getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns1"));
routerFS.listStatus(new Path("/target-ns1"));
assertCounter("ProxyOp", 1L,
getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns0"));
assertCounter("ProxyOp", 1L,
getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns1"));
}
}