HDFS-14084. Need for more stats in DFSClient. Contributed by Pranay Singh.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Pranay Singh 2019-01-25 09:01:44 -08:00 committed by Wei-Chiu Chuang
parent 45caeee6cf
commit 1d523279da
4 changed files with 198 additions and 13 deletions

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.AuthProtocol;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
@ -84,8 +85,9 @@ import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
@Public
@InterfaceStability.Evolving
public class Client implements AutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(Client.class);
private final RpcDetailedMetrics rpcDetailedMetrics;
/** A counter for generating call IDs. */
private static final AtomicInteger callIdCounter = new AtomicInteger();
@ -203,14 +205,32 @@ public class Client implements AutoCloseable {
}
clientExecutor = null;
}
return clientExecutor;
}
};
/**
* Update a particular metric by recording the processing
* time of the metric.
*
* @param name Metric name
* @param processingTime time spent in processing the metric.
*/
public void updateMetrics(String name, long processingTime) {
rpcDetailedMetrics.addProcessingTime(name, processingTime);
}
/**
* Get the RpcDetailedMetrics associated with the Client.
*/
public RpcDetailedMetrics getRpcDetailedMetrics() {
return rpcDetailedMetrics;
}
/**
* set the ping interval value in configuration
*
*
* @param conf Configuration
* @param pingInterval the ping interval
*/
@ -1314,6 +1334,12 @@ public class Client implements AutoCloseable {
this.maxAsyncCalls = conf.getInt(
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
/**
* Create with client id as argument, this differs from server
* which takes port as an argument.
*/
this.rpcDetailedMetrics =
RpcDetailedMetrics.create(Arrays.toString(this.clientId));
}
/**

View File

@ -49,6 +49,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.metrics2.MetricStringBuilder;
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
/**
* RPC Engine for for protobuf based RPCs.
@ -195,9 +197,9 @@ public class ProtobufRpcEngine implements RpcEngine {
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = Time.now();
startTime = System.currentTimeMillis();
}
if (args.length != 2) { // RpcController + Message
throw new ServiceException(
"Too many or few parameters for request. Method: ["
@ -250,10 +252,18 @@ public class ProtobufRpcEngine implements RpcEngine {
}
if (LOG.isDebugEnabled()) {
long callTime = Time.now() - startTime;
LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
long callTime = System.currentTimeMillis() - startTime;
if (callTime > 0) {
MetricStringBuilder rb =
new MetricStringBuilder(null, "", " = ", "\n");
client.updateMetrics(method.getName(), callTime);
MutableRatesWithAggregation rates =
client.getRpcDetailedMetrics().getMutableRates();
rates.snapshot(rb, true);
LOG.debug("RPC Client stats: {}", rb);
}
}
if (Client.isAsynchronousMode()) {
final AsyncGet<RpcWritable.Buffer, IOException> arr
= Client.getAsyncRpcResponse();

View File

@ -48,8 +48,20 @@ public class RpcDetailedMetrics {
LOG.debug(registry.info().toString());
}
RpcDetailedMetrics(String clientId) {
name = "RpcDetailedActivityForClient"+ clientId;
registry = new MetricsRegistry("rpcdetailed")
.tag("client", "RPC client", clientId);
LOG.debug(registry.info().toString());
}
public String name() { return name; }
public static RpcDetailedMetrics create(String clientId) {
RpcDetailedMetrics m = new RpcDetailedMetrics(clientId);
return DefaultMetricsSystem.instance().register(m.name, null, m);
}
public static RpcDetailedMetrics create(int port) {
RpcDetailedMetrics m = new RpcDetailedMetrics(port);
return DefaultMetricsSystem.instance().register(m.name, null, m);
@ -70,12 +82,16 @@ public class RpcDetailedMetrics {
* @param processingTime the processing time
*/
//@Override // some instrumentation interface
public void addProcessingTime(String name, int processingTime) {
rates.add(name, processingTime);
public void addProcessingTime(String metName, long processingTime) {
rates.add(metName, processingTime);
}
public void addDeferredProcessingTime(String name, long processingTime) {
deferredRpcRates.add(name, processingTime);
public void addDeferredProcessingTime(String metName, long processingTime) {
deferredRpcRates.add(metName, processingTime);
}
public MutableRatesWithAggregation getMutableRates() {
return rates;
}
/**

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* This class tests the FileStatus API.
*/
public class TestClientMetrics {
{
GenericTestUtils.setLogLevel(ProtobufRpcEngine.LOG,
org.slf4j.event.Level.DEBUG);
}
private static final long SEED = 0xDEADBEEFL;
private static final int BLOCKSIZE = 8192;
private static final int FILESIZE = 16384;
private static final String RPC_DETAILED_METRICS =
"RpcDetailedActivityForPort";
/** Dummy port -1 is used by the client. */
private final int portNum = -1;
private static Configuration conf;
private static MiniDFSCluster cluster;
private static FileSystem fs;
private static FileContext fc;
private static DFSClient dfsClient;
private static Path file1;
@BeforeClass
public static void testSetUp() throws Exception {
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
cluster = new MiniDFSCluster.Builder(conf).build();
fs = cluster.getFileSystem();
fc = FileContext.getFileContext(cluster.getURI(0), conf);
dfsClient = new DFSClient(DFSUtilClient.getNNAddress(conf), conf);
file1 = new Path("filestatus.dat");
DFSTestUtil.createFile(fs, file1, FILESIZE, FILESIZE, BLOCKSIZE, (short) 1,
SEED);
}
@AfterClass
public static void testTearDown() throws Exception {
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
/** Test for getting the metrics on the client. */
@Test
public void testGetMetrics() throws IOException {
final Logger log = LoggerFactory.getLogger(ProtobufRpcEngine.class);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(log);
/** Check that / exists */
Path path = new Path("/");
assertTrue("/ should be a directory",
fs.getFileStatus(path).isDirectory());
ContractTestUtils.assertNotErasureCoded(fs, path);
/** Make sure getFileInfo returns null for files which do not exist */
HdfsFileStatus fileInfo = dfsClient.getFileInfo("/noSuchFile");
assertEquals("Non-existant file should result in null", null, fileInfo);
Path path1 = new Path("/name1");
Path path2 = new Path("/name1/name2");
assertTrue(fs.mkdirs(path1));
String metricsName = RPC_DETAILED_METRICS + portNum;
FSDataOutputStream out = fs.create(path2, false);
out.close();
fileInfo = dfsClient.getFileInfo(path1.toString());
assertEquals(1, fileInfo.getChildrenNum());
fileInfo = dfsClient.getFileInfo(path2.toString());
assertEquals(0, fileInfo.getChildrenNum());
String output = logCapturer.getOutput();
assertTrue("Unexpected output in: " + output,
output.contains("MkdirsNumOps = 1"));
assertTrue("Unexpected output in: " + output,
output.contains("CreateNumOps = 1"));
assertTrue("Unexpected output in: " + output,
output.contains("GetFileInfoNumOps = 5"));
assertCounter("CreateNumOps", 1L, getMetrics(metricsName));
assertCounter("MkdirsNumOps", 1L, getMetrics(metricsName));
assertCounter("GetFileInfoNumOps", 5L, getMetrics(metricsName));
}
}