HADOOP-12325. RPC Metrics : Add the ability track and log slow RPCs. Contributed by Anu Engineer
(cherry picked from commit 48774d0a45
)
Conflicts:
hadoop-common-project/hadoop-common/CHANGES.txt
This commit is contained in:
parent
95f8e93691
commit
7a0a31586a
|
@ -243,6 +243,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HADOOP-9891. CLIMiniCluster instructions fail with MiniYarnCluster
|
||||
ClassNotFoundException (Darrell Taylor via aw)
|
||||
|
||||
HADOOP-12325. RPC Metrics : Add the ability track and log slow RPCs.
|
||||
(Anu Engineer via xyao)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
||||
|
|
|
@ -240,6 +240,11 @@ public class CommonConfigurationKeysPublic {
|
|||
/** Default value for IPC_SERVER_MAX_CONNECTIONS_KEY */
|
||||
public static final int IPC_SERVER_MAX_CONNECTIONS_DEFAULT = 0;
|
||||
|
||||
/** Logs if a RPC is really slow compared to rest of RPCs. */
|
||||
public static final String IPC_SERVER_LOG_SLOW_RPC =
|
||||
"ipc.server.log.slow.rpc";
|
||||
public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false;
|
||||
|
||||
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
|
||||
public static final String HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY =
|
||||
"hadoop.rpc.socket.factory.class.default";
|
||||
|
|
|
@ -567,7 +567,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
/**
|
||||
* This is a server side method, which is invoked over RPC. On success
|
||||
* the return response has protobuf response payload. On failure, the
|
||||
* exception name and the stack trace are return in the resposne.
|
||||
* exception name and the stack trace are returned in the response.
|
||||
* See {@link HadoopRpcResponseProto}
|
||||
*
|
||||
* In this method there three types of exceptions possible and they are
|
||||
|
@ -637,6 +637,9 @@ public class ProtobufRpcEngine implements RpcEngine {
|
|||
server.rpcMetrics.addRpcProcessingTime(processingTime);
|
||||
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
|
||||
processingTime);
|
||||
if (server.isLogSlowRPC()) {
|
||||
server.logSlowRpcCalls(methodName, processingTime);
|
||||
}
|
||||
}
|
||||
return new RpcResponseWrapper(result);
|
||||
}
|
||||
|
|
|
@ -384,6 +384,62 @@ public abstract class Server {
|
|||
private Responder responder = null;
|
||||
private Handler[] handlers = null;
|
||||
|
||||
private boolean logSlowRPC = false;
|
||||
|
||||
/**
|
||||
* Checks if LogSlowRPC is set true.
|
||||
* @return
|
||||
*/
|
||||
protected boolean isLogSlowRPC() {
|
||||
return logSlowRPC;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets slow RPC flag.
|
||||
* @param logSlowRPCFlag
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected void setLogSlowRPC(boolean logSlowRPCFlag) {
|
||||
this.logSlowRPC = logSlowRPCFlag;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Logs a Slow RPC Request.
|
||||
*
|
||||
* @param methodName - RPC Request method name
|
||||
* @param processingTime - Processing Time.
|
||||
*
|
||||
* if this request took too much time relative to other requests
|
||||
* we consider that as a slow RPC. 3 is a magic number that comes
|
||||
* from 3 sigma deviation. A very simple explanation can be found
|
||||
* by searching for 68–95–99.7 rule. We flag an RPC as slow RPC
|
||||
* if and only if it falls above 99.7% of requests. We start this logic
|
||||
* only once we have enough sample size.
|
||||
*/
|
||||
void logSlowRpcCalls(String methodName, int processingTime) {
|
||||
final int deviation = 3;
|
||||
|
||||
// 1024 for minSampleSize just a guess -- not a number computed based on
|
||||
// sample size analysis. It is chosen with the hope that this
|
||||
// number is high enough to avoid spurious logging, yet useful
|
||||
// in practice.
|
||||
final int minSampleSize = 1024;
|
||||
final double threeSigma = rpcMetrics.getProcessingMean() +
|
||||
(rpcMetrics.getProcessingStdDev() * deviation);
|
||||
|
||||
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
|
||||
(processingTime > threeSigma)) {
|
||||
if(LOG.isWarnEnabled()) {
|
||||
String client = CurCall.get().connection.toString();
|
||||
LOG.warn(
|
||||
"Slow RPC : " + methodName + " took " + processingTime +
|
||||
" milliseconds to process from client " + client);
|
||||
}
|
||||
rpcMetrics.incrSlowRpc();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A convenience method to bind to a given address and report
|
||||
* better exceptions if the address is not a valid host.
|
||||
|
@ -2260,6 +2316,10 @@ public abstract class Server {
|
|||
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
|
||||
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);
|
||||
|
||||
this.setLogSlowRPC(conf.getBoolean(
|
||||
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
|
||||
CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));
|
||||
|
||||
// Create the responder here
|
||||
responder = new Responder();
|
||||
|
||||
|
|
|
@ -551,6 +551,9 @@ public class WritableRpcEngine implements RpcEngine {
|
|||
server.rpcMetrics.addRpcProcessingTime(processingTime);
|
||||
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
|
||||
processingTime);
|
||||
if (server.isLogSlowRPC()) {
|
||||
server.logSlowRpcCalls(call.getMethodName(), processingTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,6 +97,8 @@ public class RpcMetrics {
|
|||
MutableCounterLong rpcAuthorizationSuccesses;
|
||||
@Metric("Number of client backoff requests")
|
||||
MutableCounterLong rpcClientBackoff;
|
||||
@Metric("Number of Slow RPC calls")
|
||||
MutableCounterLong rpcSlowCalls;
|
||||
|
||||
@Metric("Number of open connections") public int numOpenConnections() {
|
||||
return server.getNumOpenConnections();
|
||||
|
@ -202,4 +204,50 @@ public class RpcMetrics {
|
|||
public void incrClientBackoff() {
|
||||
rpcClientBackoff.incr();
|
||||
}
|
||||
|
||||
/**
|
||||
* Increments the Slow RPC counter.
|
||||
*/
|
||||
public void incrSlowRpc() {
|
||||
rpcSlowCalls.incr();
|
||||
}
|
||||
/**
|
||||
* Returns a MutableRate Counter.
|
||||
* @return Mutable Rate
|
||||
*/
|
||||
public MutableRate getRpcProcessingTime() {
|
||||
return rpcProcessingTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of samples that we have seen so far.
|
||||
* @return long
|
||||
*/
|
||||
public long getProcessingSampleCount() {
|
||||
return rpcProcessingTime.lastStat().numSamples();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns mean of RPC Processing Times.
|
||||
* @return double
|
||||
*/
|
||||
public double getProcessingMean() {
|
||||
return rpcProcessingTime.lastStat().mean();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return Standard Deviation of the Processing Time.
|
||||
* @return double
|
||||
*/
|
||||
public double getProcessingStdDev() {
|
||||
return rpcProcessingTime.lastStat().stddev();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of slow calls.
|
||||
* @return long
|
||||
*/
|
||||
public long getRpcSlowCalls() {
|
||||
return rpcSlowCalls.value();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -139,7 +139,12 @@ public class MutableStat extends MutableMetric {
|
|||
}
|
||||
}
|
||||
|
||||
private SampleStat lastStat() {
|
||||
/**
|
||||
* Return a SampleStat object that supports
|
||||
* calls like StdDev and Mean.
|
||||
* @return SampleStat
|
||||
*/
|
||||
public SampleStat lastStat() {
|
||||
return changed() ? intervalStat : prevStat;
|
||||
}
|
||||
|
||||
|
|
|
@ -1033,6 +1033,15 @@ for ldap providers in the same way as above does.
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ipc.server.log.slow.rpc</name>
|
||||
<value>false</value>
|
||||
<description>This setting is useful to troubleshoot performance issues for
|
||||
various services. If this value is set to true then we log requests that
|
||||
fall into 99th percentile as well as increment RpcSlowCalls counter.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<!-- Proxy Configuration -->
|
||||
|
||||
<property>
|
||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.hadoop.ipc;
|
|||
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -27,7 +29,9 @@ import java.net.URISyntaxException;
|
|||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
|
||||
|
@ -41,6 +45,7 @@ import org.junit.Test;
|
|||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -56,6 +61,7 @@ public class TestProtoBufRpc {
|
|||
private static InetSocketAddress addr;
|
||||
private static Configuration conf;
|
||||
private static RPC.Server server;
|
||||
private final static int SLEEP_DURATION = 1000;
|
||||
|
||||
@ProtocolInfo(protocolName = "testProto", protocolVersion = 1)
|
||||
public interface TestRpcService
|
||||
|
@ -114,12 +120,23 @@ public class TestProtoBufRpc {
|
|||
return EchoResponseProto.newBuilder().setMessage(request.getMessage())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestProtos.SleepResponseProto sleep(RpcController controller,
|
||||
TestProtos.SleepRequestProto request) throws ServiceException {
|
||||
try{
|
||||
Thread.sleep(request.getMilliSeconds());
|
||||
} catch (InterruptedException ex){
|
||||
}
|
||||
return TestProtos.SleepResponseProto.newBuilder().build();
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException { // Setup server for both protocols
|
||||
conf = new Configuration();
|
||||
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024);
|
||||
conf.setBoolean(CommonConfigurationKeys.IPC_SERVER_LOG_SLOW_RPC, true);
|
||||
// Set RPC engine to protobuf RPC engine
|
||||
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
||||
|
||||
|
@ -257,4 +274,62 @@ public class TestProtoBufRpc {
|
|||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 12000)
|
||||
public void testLogSlowRPC() throws IOException, ServiceException {
|
||||
TestRpcService2 client = getClient2();
|
||||
// make 10 K fast calls
|
||||
for (int x = 0; x < 10000; x++) {
|
||||
try {
|
||||
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
||||
client.ping2(null, emptyRequest);
|
||||
} catch (Exception ex) {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure RPC metrics are updated
|
||||
RpcMetrics rpcMetrics = server.getRpcMetrics();
|
||||
assertTrue(rpcMetrics.getProcessingSampleCount() > 999L);
|
||||
long before = rpcMetrics.getRpcSlowCalls();
|
||||
|
||||
// make a really slow call. Sleep sleeps for 1000ms
|
||||
TestProtos.SleepRequestProto sleepRequest =
|
||||
TestProtos.SleepRequestProto.newBuilder()
|
||||
.setMilliSeconds(SLEEP_DURATION * 3).build();
|
||||
TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest);
|
||||
|
||||
long after = rpcMetrics.getRpcSlowCalls();
|
||||
// Ensure slow call is logged.
|
||||
Assert.assertEquals(before + 1L, after);
|
||||
}
|
||||
|
||||
@Test(timeout = 12000)
|
||||
public void testEnsureNoLogIfDisabled() throws IOException, ServiceException {
|
||||
// disable slow RPC logging
|
||||
server.setLogSlowRPC(false);
|
||||
TestRpcService2 client = getClient2();
|
||||
|
||||
// make 10 K fast calls
|
||||
for (int x = 0; x < 10000; x++) {
|
||||
EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
||||
client.ping2(null, emptyRequest);
|
||||
}
|
||||
|
||||
// Ensure RPC metrics are updated
|
||||
RpcMetrics rpcMetrics = server.getRpcMetrics();
|
||||
assertTrue(rpcMetrics.getProcessingSampleCount() > 999L);
|
||||
long before = rpcMetrics.getRpcSlowCalls();
|
||||
|
||||
// make a really slow call. Sleep sleeps for 1000ms
|
||||
TestProtos.SleepRequestProto sleepRequest =
|
||||
TestProtos.SleepRequestProto.newBuilder()
|
||||
.setMilliSeconds(SLEEP_DURATION).build();
|
||||
TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest);
|
||||
|
||||
long after = rpcMetrics.getRpcSlowCalls();
|
||||
|
||||
// make sure we never called into Log slow RPC routine.
|
||||
assertEquals(before, after);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ public class MetricsAsserts {
|
|||
* Call getMetrics on source and get a record builder mock to verify
|
||||
* @param source the metrics source
|
||||
* @param all if true, return all metrics even if not changed
|
||||
* @return the record builder mock to verify
|
||||
* @return the record builder mock to verifyÏ
|
||||
*/
|
||||
public static MetricsRecordBuilder getMetrics(MetricsSource source,
|
||||
boolean all) {
|
||||
|
|
|
@ -34,3 +34,10 @@ message EchoRequestProto {
|
|||
message EchoResponseProto {
|
||||
required string message = 1;
|
||||
}
|
||||
|
||||
message SleepRequestProto{
|
||||
required int32 milliSeconds = 1;
|
||||
}
|
||||
|
||||
message SleepResponseProto{
|
||||
}
|
|
@ -37,4 +37,5 @@ service TestProtobufRpcProto {
|
|||
service TestProtobufRpc2Proto {
|
||||
rpc ping2(EmptyRequestProto) returns (EmptyResponseProto);
|
||||
rpc echo2(EchoRequestProto) returns (EchoResponseProto);
|
||||
rpc sleep(SleepRequestProto) returns (SleepResponseProto);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue