HADOOP-12325. RPC Metrics : Add the ability track and log slow RPCs. Contributed by Anu Engineer

This commit is contained in:
Xiaoyu Yao 2015-08-24 14:31:24 -07:00
parent b5ce87f84d
commit 48774d0a45
12 changed files with 223 additions and 4 deletions

View File

@ -753,6 +753,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12050. Enable MaxInactiveInterval for hadoop http auth token
(hzlu via benoyantony)
HADOOP-12325. RPC Metrics : Add the ability track and log slow RPCs.
(Anu Engineer via xyao)
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -235,6 +235,11 @@ public class CommonConfigurationKeysPublic {
/** Default value for IPC_SERVER_MAX_CONNECTIONS_KEY */
public static final int IPC_SERVER_MAX_CONNECTIONS_DEFAULT = 0;
/** Logs if a RPC is really slow compared to rest of RPCs. */
public static final String IPC_SERVER_LOG_SLOW_RPC =
"ipc.server.log.slow.rpc";
public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false;
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY =
"hadoop.rpc.socket.factory.class.default";

View File

@ -567,7 +567,7 @@ public class ProtobufRpcEngine implements RpcEngine {
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are return in the resposne.
* exception name and the stack trace are returned in the response.
* See {@link HadoopRpcResponseProto}
*
* In this method there three types of exceptions possible and they are
@ -657,6 +657,9 @@ public class ProtobufRpcEngine implements RpcEngine {
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
processingTime);
if (server.isLogSlowRPC()) {
server.logSlowRpcCalls(methodName, processingTime);
}
}
return new RpcResponseWrapper(result);
}

View File

@ -387,6 +387,62 @@ public abstract class Server {
private Responder responder = null;
private Handler[] handlers = null;
private boolean logSlowRPC = false;
/**
* Checks if LogSlowRPC is set true.
* @return
*/
protected boolean isLogSlowRPC() {
return logSlowRPC;
}
/**
* Sets slow RPC flag.
* @param logSlowRPCFlag
*/
@VisibleForTesting
protected void setLogSlowRPC(boolean logSlowRPCFlag) {
this.logSlowRPC = logSlowRPCFlag;
}
/**
* Logs a Slow RPC Request.
*
* @param methodName - RPC Request method name
* @param processingTime - Processing Time.
*
* if this request took too much time relative to other requests
* we consider that as a slow RPC. 3 is a magic number that comes
* from 3 sigma deviation. A very simple explanation can be found
* by searching for 689599.7 rule. We flag an RPC as slow RPC
* if and only if it falls above 99.7% of requests. We start this logic
* only once we have enough sample size.
*/
void logSlowRpcCalls(String methodName, int processingTime) {
final int deviation = 3;
// 1024 for minSampleSize just a guess -- not a number computed based on
// sample size analysis. It is chosen with the hope that this
// number is high enough to avoid spurious logging, yet useful
// in practice.
final int minSampleSize = 1024;
final double threeSigma = rpcMetrics.getProcessingMean() +
(rpcMetrics.getProcessingStdDev() * deviation);
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
(processingTime > threeSigma)) {
if(LOG.isWarnEnabled()) {
String client = CurCall.get().connection.toString();
LOG.warn(
"Slow RPC : " + methodName + " took " + processingTime +
" milliseconds to process from client " + client);
}
rpcMetrics.incrSlowRpc();
}
}
/**
* A convenience method to bind to a given address and report
* better exceptions if the address is not a valid host.
@ -2346,6 +2402,10 @@ public abstract class Server {
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);
this.setLogSlowRPC(conf.getBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));
// Create the responder here
responder = new Responder();

View File

@ -551,6 +551,9 @@ public class WritableRpcEngine implements RpcEngine {
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
processingTime);
if (server.isLogSlowRPC()) {
server.logSlowRpcCalls(call.getMethodName(), processingTime);
}
}
}
}

View File

@ -97,6 +97,8 @@ public class RpcMetrics {
MutableCounterLong rpcAuthorizationSuccesses;
@Metric("Number of client backoff requests")
MutableCounterLong rpcClientBackoff;
@Metric("Number of Slow RPC calls")
MutableCounterLong rpcSlowCalls;
@Metric("Number of open connections") public int numOpenConnections() {
return server.getNumOpenConnections();
@ -202,4 +204,50 @@ public class RpcMetrics {
public void incrClientBackoff() {
rpcClientBackoff.incr();
}
/**
* Increments the Slow RPC counter.
*/
public void incrSlowRpc() {
rpcSlowCalls.incr();
}
/**
* Returns a MutableRate Counter.
* @return Mutable Rate
*/
public MutableRate getRpcProcessingTime() {
return rpcProcessingTime;
}
/**
* Returns the number of samples that we have seen so far.
* @return long
*/
public long getProcessingSampleCount() {
return rpcProcessingTime.lastStat().numSamples();
}
/**
* Returns mean of RPC Processing Times.
* @return double
*/
public double getProcessingMean() {
return rpcProcessingTime.lastStat().mean();
}
/**
* Return Standard Deviation of the Processing Time.
* @return double
*/
public double getProcessingStdDev() {
return rpcProcessingTime.lastStat().stddev();
}
/**
* Returns the number of slow calls.
* @return long
*/
public long getRpcSlowCalls() {
return rpcSlowCalls.value();
}
}

View File

@ -140,7 +140,12 @@ public class MutableStat extends MutableMetric {
}
}
private SampleStat lastStat() {
/**
* Return a SampleStat object that supports
* calls like StdDev and Mean.
* @return SampleStat
*/
public SampleStat lastStat() {
return changed() ? intervalStat : prevStat;
}

View File

@ -1032,6 +1032,15 @@ for ldap providers in the same way as above does.
</description>
</property>
<property>
<name>ipc.server.log.slow.rpc</name>
<value>false</value>
<description>This setting is useful to troubleshoot performance issues for
various services. If this value is set to true then we log requests that
fall into 99th percentile as well as increment RpcSlowCalls counter.
</description>
</property>
<!-- Proxy Configuration -->
<property>

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.ipc;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -27,7 +29,9 @@ import java.net.URISyntaxException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
@ -41,6 +45,7 @@ import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -56,7 +61,8 @@ public class TestProtoBufRpc {
private static InetSocketAddress addr;
private static Configuration conf;
private static RPC.Server server;
private final static int SLEEP_DURATION = 1000;
@ProtocolInfo(protocolName = "testProto", protocolVersion = 1)
public interface TestRpcService
extends TestProtobufRpcProto.BlockingInterface {
@ -114,12 +120,23 @@ public class TestProtoBufRpc {
return EchoResponseProto.newBuilder().setMessage(request.getMessage())
.build();
}
@Override
public TestProtos.SleepResponseProto sleep(RpcController controller,
TestProtos.SleepRequestProto request) throws ServiceException {
try{
Thread.sleep(request.getMilliSeconds());
} catch (InterruptedException ex){
}
return TestProtos.SleepResponseProto.newBuilder().build();
}
}
@Before
public void setUp() throws IOException { // Setup server for both protocols
conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024);
conf.setBoolean(CommonConfigurationKeys.IPC_SERVER_LOG_SLOW_RPC, true);
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
@ -257,4 +274,62 @@ public class TestProtoBufRpc {
// expected
}
}
@Test(timeout = 12000)
public void testLogSlowRPC() throws IOException, ServiceException {
TestRpcService2 client = getClient2();
// make 10 K fast calls
for (int x = 0; x < 10000; x++) {
try {
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
client.ping2(null, emptyRequest);
} catch (Exception ex) {
throw ex;
}
}
// Ensure RPC metrics are updated
RpcMetrics rpcMetrics = server.getRpcMetrics();
assertTrue(rpcMetrics.getProcessingSampleCount() > 999L);
long before = rpcMetrics.getRpcSlowCalls();
// make a really slow call. Sleep sleeps for 1000ms
TestProtos.SleepRequestProto sleepRequest =
TestProtos.SleepRequestProto.newBuilder()
.setMilliSeconds(SLEEP_DURATION * 3).build();
TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest);
long after = rpcMetrics.getRpcSlowCalls();
// Ensure slow call is logged.
Assert.assertEquals(before + 1L, after);
}
@Test(timeout = 12000)
public void testEnsureNoLogIfDisabled() throws IOException, ServiceException {
// disable slow RPC logging
server.setLogSlowRPC(false);
TestRpcService2 client = getClient2();
// make 10 K fast calls
for (int x = 0; x < 10000; x++) {
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
client.ping2(null, emptyRequest);
}
// Ensure RPC metrics are updated
RpcMetrics rpcMetrics = server.getRpcMetrics();
assertTrue(rpcMetrics.getProcessingSampleCount() > 999L);
long before = rpcMetrics.getRpcSlowCalls();
// make a really slow call. Sleep sleeps for 1000ms
TestProtos.SleepRequestProto sleepRequest =
TestProtos.SleepRequestProto.newBuilder()
.setMilliSeconds(SLEEP_DURATION).build();
TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest);
long after = rpcMetrics.getRpcSlowCalls();
// make sure we never called into Log slow RPC routine.
assertEquals(before, after);
}
}

View File

@ -87,7 +87,7 @@ public class MetricsAsserts {
* Call getMetrics on source and get a record builder mock to verify
* @param source the metrics source
* @param all if true, return all metrics even if not changed
* @return the record builder mock to verify
* @return the record builder mock to verifyÏ
*/
public static MetricsRecordBuilder getMetrics(MetricsSource source,
boolean all) {

View File

@ -34,3 +34,10 @@ message EchoRequestProto {
message EchoResponseProto {
required string message = 1;
}
message SleepRequestProto{
required int32 milliSeconds = 1;
}
message SleepResponseProto{
}

View File

@ -37,4 +37,5 @@ service TestProtobufRpcProto {
service TestProtobufRpc2Proto {
rpc ping2(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo2(EchoRequestProto) returns (EchoResponseProto);
rpc sleep(SleepRequestProto) returns (SleepResponseProto);
}