diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 837b84b34f0..6c27abe2116 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -243,6 +243,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-9891. CLIMiniCluster instructions fail with MiniYarnCluster
ClassNotFoundException (Darrell Taylor via aw)
+ HADOOP-12325. RPC Metrics : Add the ability track and log slow RPCs.
+ (Anu Engineer via xyao)
+
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
index c3190b279c6..3c603d9d3be 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
@@ -240,6 +240,11 @@ public class CommonConfigurationKeysPublic {
/** Default value for IPC_SERVER_MAX_CONNECTIONS_KEY */
public static final int IPC_SERVER_MAX_CONNECTIONS_DEFAULT = 0;
+ /** Logs if a RPC is really slow compared to rest of RPCs. */
+ public static final String IPC_SERVER_LOG_SLOW_RPC =
+ "ipc.server.log.slow.rpc";
+ public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false;
+
/** See core-default.xml */
public static final String HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY =
"hadoop.rpc.socket.factory.class.default";
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index abb494ad211..df5ce5f2fdc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -567,7 +567,7 @@ private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
- * exception name and the stack trace are return in the resposne.
+ * exception name and the stack trace are returned in the response.
* See {@link HadoopRpcResponseProto}
*
* In this method there three types of exceptions possible and they are
@@ -637,6 +637,9 @@ public Writable call(RPC.Server server, String protocol,
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
processingTime);
+ if (server.isLogSlowRPC()) {
+ server.logSlowRpcCalls(methodName, processingTime);
+ }
}
return new RpcResponseWrapper(result);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index baad622b13f..2092743fadc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -384,6 +384,62 @@ public static boolean isRpcInvocation() {
private Responder responder = null;
private Handler[] handlers = null;
+ private boolean logSlowRPC = false;
+
+ /**
+ * Checks if LogSlowRPC is set true.
+ * @return
+ */
+ protected boolean isLogSlowRPC() {
+ return logSlowRPC;
+ }
+
+ /**
+ * Sets slow RPC flag.
+ * @param logSlowRPCFlag
+ */
+ @VisibleForTesting
+ protected void setLogSlowRPC(boolean logSlowRPCFlag) {
+ this.logSlowRPC = logSlowRPCFlag;
+ }
+
+
+ /**
+ * Logs a Slow RPC Request.
+ *
+ * @param methodName - RPC Request method name
+ * @param processingTime - Processing Time.
+ *
+ * if this request took too much time relative to other requests
+ * we consider that as a slow RPC. 3 is a magic number that comes
+ * from 3 sigma deviation. A very simple explanation can be found
+ * by searching for 68–95–99.7 rule. We flag an RPC as slow RPC
+ * if and only if it falls above 99.7% of requests. We start this logic
+ * only once we have enough sample size.
+ */
+ void logSlowRpcCalls(String methodName, int processingTime) {
+ final int deviation = 3;
+
+ // 1024 for minSampleSize just a guess -- not a number computed based on
+ // sample size analysis. It is chosen with the hope that this
+ // number is high enough to avoid spurious logging, yet useful
+ // in practice.
+ final int minSampleSize = 1024;
+ final double threeSigma = rpcMetrics.getProcessingMean() +
+ (rpcMetrics.getProcessingStdDev() * deviation);
+
+ if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
+ (processingTime > threeSigma)) {
+ if(LOG.isWarnEnabled()) {
+ String client = CurCall.get().connection.toString();
+ LOG.warn(
+ "Slow RPC : " + methodName + " took " + processingTime +
+ " milliseconds to process from client " + client);
+ }
+ rpcMetrics.incrSlowRpc();
+ }
+ }
+
/**
* A convenience method to bind to a given address and report
* better exceptions if the address is not a valid host.
@@ -2260,6 +2316,10 @@ protected Server(String bindAddress, int port,
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);
+ this.setLogSlowRPC(conf.getBoolean(
+ CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
+ CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));
+
// Create the responder here
responder = new Responder();
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index fa13631ebc3..cf9fd5d4e94 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -551,6 +551,9 @@ public Writable call(org.apache.hadoop.ipc.RPC.Server server,
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
processingTime);
+ if (server.isLogSlowRPC()) {
+ server.logSlowRpcCalls(call.getMethodName(), processingTime);
+ }
}
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
index bc9aa8946bd..5373f95e698 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
@@ -97,6 +97,8 @@ public static RpcMetrics create(Server server, Configuration conf) {
MutableCounterLong rpcAuthorizationSuccesses;
@Metric("Number of client backoff requests")
MutableCounterLong rpcClientBackoff;
+ @Metric("Number of Slow RPC calls")
+ MutableCounterLong rpcSlowCalls;
@Metric("Number of open connections") public int numOpenConnections() {
return server.getNumOpenConnections();
@@ -202,4 +204,50 @@ public void addRpcProcessingTime(int processingTime) {
public void incrClientBackoff() {
rpcClientBackoff.incr();
}
+
+ /**
+ * Increments the Slow RPC counter.
+ */
+ public void incrSlowRpc() {
+ rpcSlowCalls.incr();
+ }
+ /**
+ * Returns a MutableRate Counter.
+ * @return Mutable Rate
+ */
+ public MutableRate getRpcProcessingTime() {
+ return rpcProcessingTime;
+ }
+
+ /**
+ * Returns the number of samples that we have seen so far.
+ * @return long
+ */
+ public long getProcessingSampleCount() {
+ return rpcProcessingTime.lastStat().numSamples();
+ }
+
+ /**
+ * Returns mean of RPC Processing Times.
+ * @return double
+ */
+ public double getProcessingMean() {
+ return rpcProcessingTime.lastStat().mean();
+ }
+
+ /**
+ * Return Standard Deviation of the Processing Time.
+ * @return double
+ */
+ public double getProcessingStdDev() {
+ return rpcProcessingTime.lastStat().stddev();
+ }
+
+ /**
+ * Returns the number of slow calls.
+ * @return long
+ */
+ public long getRpcSlowCalls() {
+ return rpcSlowCalls.value();
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java
index d323a72b235..9410c76e218 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java
@@ -139,7 +139,12 @@ public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
}
}
- private SampleStat lastStat() {
+ /**
+ * Return a SampleStat object that supports
+ * calls like StdDev and Mean.
+ * @return SampleStat
+ */
+ public SampleStat lastStat() {
return changed() ? intervalStat : prevStat;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 1e54e6f3c82..141fd1506e6 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1033,6 +1033,15 @@ for ldap providers in the same way as above does.
+
+ ipc.server.log.slow.rpc
+ false
+ This setting is useful to troubleshoot performance issues for
+ various services. If this value is set to true then we log requests that
+ fall into 99th percentile as well as increment RpcSlowCalls counter.
+
+
+
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
index 26d5f2e7620..d9c9d6c14bc 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java
@@ -19,6 +19,8 @@
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -27,7 +29,9 @@
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
+import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
@@ -41,6 +45,7 @@
import org.junit.Before;
import org.junit.After;
+
import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -56,7 +61,8 @@ public class TestProtoBufRpc {
private static InetSocketAddress addr;
private static Configuration conf;
private static RPC.Server server;
-
+ private final static int SLEEP_DURATION = 1000;
+
@ProtocolInfo(protocolName = "testProto", protocolVersion = 1)
public interface TestRpcService
extends TestProtobufRpcProto.BlockingInterface {
@@ -114,12 +120,23 @@ public EchoResponseProto echo2(RpcController unused, EchoRequestProto request)
return EchoResponseProto.newBuilder().setMessage(request.getMessage())
.build();
}
+
+ @Override
+ public TestProtos.SleepResponseProto sleep(RpcController controller,
+ TestProtos.SleepRequestProto request) throws ServiceException {
+ try{
+ Thread.sleep(request.getMilliSeconds());
+ } catch (InterruptedException ex){
+ }
+ return TestProtos.SleepResponseProto.newBuilder().build();
+ }
}
@Before
public void setUp() throws IOException { // Setup server for both protocols
conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024);
+ conf.setBoolean(CommonConfigurationKeys.IPC_SERVER_LOG_SLOW_RPC, true);
// Set RPC engine to protobuf RPC engine
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
@@ -257,4 +274,62 @@ public void testExtraLongRpc() throws Exception {
// expected
}
}
+
+ @Test(timeout = 12000)
+ public void testLogSlowRPC() throws IOException, ServiceException {
+ TestRpcService2 client = getClient2();
+ // make 10 K fast calls
+ for (int x = 0; x < 10000; x++) {
+ try {
+ EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
+ client.ping2(null, emptyRequest);
+ } catch (Exception ex) {
+ throw ex;
+ }
+ }
+
+ // Ensure RPC metrics are updated
+ RpcMetrics rpcMetrics = server.getRpcMetrics();
+ assertTrue(rpcMetrics.getProcessingSampleCount() > 999L);
+ long before = rpcMetrics.getRpcSlowCalls();
+
+ // make a really slow call. Sleep sleeps for 1000ms
+ TestProtos.SleepRequestProto sleepRequest =
+ TestProtos.SleepRequestProto.newBuilder()
+ .setMilliSeconds(SLEEP_DURATION * 3).build();
+ TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest);
+
+ long after = rpcMetrics.getRpcSlowCalls();
+ // Ensure slow call is logged.
+ Assert.assertEquals(before + 1L, after);
+ }
+
+ @Test(timeout = 12000)
+ public void testEnsureNoLogIfDisabled() throws IOException, ServiceException {
+ // disable slow RPC logging
+ server.setLogSlowRPC(false);
+ TestRpcService2 client = getClient2();
+
+ // make 10 K fast calls
+ for (int x = 0; x < 10000; x++) {
+ EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
+ client.ping2(null, emptyRequest);
+ }
+
+ // Ensure RPC metrics are updated
+ RpcMetrics rpcMetrics = server.getRpcMetrics();
+ assertTrue(rpcMetrics.getProcessingSampleCount() > 999L);
+ long before = rpcMetrics.getRpcSlowCalls();
+
+ // make a really slow call. Sleep sleeps for 1000ms
+ TestProtos.SleepRequestProto sleepRequest =
+ TestProtos.SleepRequestProto.newBuilder()
+ .setMilliSeconds(SLEEP_DURATION).build();
+ TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest);
+
+ long after = rpcMetrics.getRpcSlowCalls();
+
+ // make sure we never called into Log slow RPC routine.
+ assertEquals(before, after);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
index 6ef92b8b69e..982481e6081 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
@@ -87,7 +87,7 @@ public Object answer(InvocationOnMock invocation) {
* Call getMetrics on source and get a record builder mock to verify
* @param source the metrics source
* @param all if true, return all metrics even if not changed
- * @return the record builder mock to verify
+ * @return the record builder mock to verifyÏ
*/
public static MetricsRecordBuilder getMetrics(MetricsSource source,
boolean all) {
diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test.proto b/hadoop-common-project/hadoop-common/src/test/proto/test.proto
index 91a2f5e4594..9965f24485e 100644
--- a/hadoop-common-project/hadoop-common/src/test/proto/test.proto
+++ b/hadoop-common-project/hadoop-common/src/test/proto/test.proto
@@ -34,3 +34,10 @@ message EchoRequestProto {
message EchoResponseProto {
required string message = 1;
}
+
+message SleepRequestProto{
+ required int32 milliSeconds = 1;
+}
+
+message SleepResponseProto{
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
index 1d54e45d5a7..4f64088531f 100644
--- a/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
+++ b/hadoop-common-project/hadoop-common/src/test/proto/test_rpc_service.proto
@@ -37,4 +37,5 @@ service TestProtobufRpcProto {
service TestProtobufRpc2Proto {
rpc ping2(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo2(EchoRequestProto) returns (EchoResponseProto);
+ rpc sleep(SleepRequestProto) returns (SleepResponseProto);
}