diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 451ce2619b9..04146abcf66 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -600,17 +600,18 @@ public abstract class Server { } } - void updateMetrics(Call call, long startTime, boolean connDropped) { + void updateMetrics(Call call, long processingStartTime, boolean connDropped) { totalRequests.increment(); // delta = handler + processing + response - long deltaNanos = Time.monotonicNowNanos() - startTime; - long timestampNanos = call.timestampNanos; + long completionTime = Time.monotonicNowNanos(); + long deltaNanos = completionTime - processingStartTime; + long arrivalTime = call.timestampNanos; ProcessingDetails details = call.getProcessingDetails(); // queue time is the delta between when the call first arrived and when it // began being serviced, minus the time it took to be put into the queue details.set(Timing.QUEUE, - startTime - timestampNanos - details.get(Timing.ENQUEUE)); + processingStartTime - arrivalTime - details.get(Timing.ENQUEUE)); deltaNanos -= details.get(Timing.PROCESSING); deltaNanos -= details.get(Timing.RESPONSE); details.set(Timing.HANDLER, deltaNanos); @@ -636,6 +637,10 @@ public abstract class Server { processingTime -= waitTime; String name = call.getDetailedMetricsName(); rpcDetailedMetrics.addProcessingTime(name, processingTime); + // Overall processing time is from arrival to completion. + rpcDetailedMetrics.addOverallProcessingTime(name, + rpcMetrics.getMetricsTimeUnit().convert(completionTime - arrivalTime, + TimeUnit.NANOSECONDS)); callQueue.addResponseTime(name, call, details); if (isLogSlowRPC()) { logSlowRpcCalls(name, call, details); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java index 98b9f262b85..d8d35ae7a8a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java @@ -26,6 +26,9 @@ import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.commons.lang3.StringUtils.capitalize; + + /** * This class is for maintaining RPC method related statistics * and publishing them through the metrics interfaces. @@ -33,14 +36,26 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private @Metrics(about="Per method RPC metrics", context="rpcdetailed") public class RpcDetailedMetrics { + static final String OVERALL_PROCESSING_PREFIX = "Overall"; + // per-method RPC processing time @Metric MutableRatesWithAggregation rates; @Metric MutableRatesWithAggregation deferredRpcRates; + /** + * per-method overall RPC processing time, from request arrival to when the + * response is sent back. + */ + @Metric MutableRatesWithAggregation overallRpcProcessingRates; static final Logger LOG = LoggerFactory.getLogger(RpcDetailedMetrics.class); final MetricsRegistry registry; final String name; + // Mainly to facilitate testing in TestRPC.java + public MutableRatesWithAggregation getOverallRpcProcessingRates() { + return overallRpcProcessingRates; + } + RpcDetailedMetrics(int port) { name = "RpcDetailedActivityForPort"+ port; registry = new MetricsRegistry("rpcdetailed") @@ -62,6 +77,7 @@ public class RpcDetailedMetrics { public void init(Class protocol) { rates.init(protocol); deferredRpcRates.init(protocol, "Deferred"); + overallRpcProcessingRates.init(protocol); } /** @@ -78,6 +94,16 @@ public class RpcDetailedMetrics { deferredRpcRates.add(name, processingTime); } + /** + * Add an overall RPC processing time sample + * @param rpcCallName of the RPC call + * @param overallProcessingTime the overall RPC processing time + */ + public void addOverallProcessingTime(String rpcCallName, long overallProcessingTime) { + String metric = OVERALL_PROCESSING_PREFIX + capitalize(rpcCallName); + overallRpcProcessingRates.add(metric, overallProcessingTime); + } + /** * Shutdown the instrumentation for the process */ diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 6e5cb7059c6..e1deab5afe1 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.Rpc import org.apache.hadoop.ipc.protobuf.TestProtos; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; @@ -95,6 +96,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt; +import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder; import static org.assertj.core.api.Assertions.assertThat; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; @@ -1441,6 +1444,41 @@ public class TestRPC extends TestRpcBase { } } + /** + * Test per-type overall RPC processing time metric + */ + @Test + public void testOverallRpcProcessingTimeMetric() throws Exception { + final Server server; + TestRpcService proxy = null; + + server = setupTestServer(conf, 5); + try { + proxy = getClient(addr, conf); + + // Sent 1 ping request and 2 lockAndSleep requests + proxy.ping(null, newEmptyRequest()); + proxy.lockAndSleep(null, newSleepRequest(10)); + proxy.lockAndSleep(null, newSleepRequest(12)); + + MetricsRecordBuilder rb = mockMetricsRecordBuilder(); + MutableRatesWithAggregation rates = + server.rpcDetailedMetrics.getOverallRpcProcessingRates(); + rates.snapshot(rb, true); + + // Verify the ping request. AvgTime should be non-zero. + assertCounter("OverallPingNumOps", 1L, rb); + assertGaugeGt("OverallPingAvgTime", 0, rb); + + // Verify lockAndSleep requests. AvgTime should be greater than 10 ms, + // since we sleep for 10 and 12 ms respectively. + assertCounter("OverallLockAndSleepNumOps", 2L, rb); + assertGaugeGt("OverallLockAndSleepAvgTime", 10.0, rb); + + } finally { + stop(server, proxy); + } + } /** * Test RPC backoff by queue full. */