From c03c62b353bdf0614ace92b695281013a86d63f6 Mon Sep 17 00:00:00 2001 From: Xing Lin Date: Fri, 9 Jun 2023 11:31:56 -0700 Subject: [PATCH] Added rpcCallSuccesses metric --- .../apache/hadoop/ipc/ProcessingDetails.java | 15 ++++++- .../java/org/apache/hadoop/ipc/Server.java | 4 ++ .../apache/hadoop/ipc/metrics/RpcMetrics.java | 9 ++++ .../src/site/markdown/Metrics.md | 3 ++ .../java/org/apache/hadoop/ipc/TestRPC.java | 44 +++++++++++++++++++ 5 files changed, 74 insertions(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java index 5b97eec9c11..eba0da5c0ee 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java @@ -20,13 +20,15 @@ package org.apache.hadoop.ipc; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; /** - * Stores the times that a call takes to be processed through each step. + * Stores the times that a call takes to be processed through each step and + * its response status. */ @InterfaceStability.Unstable @InterfaceAudience.Private @@ -53,6 +55,9 @@ public class ProcessingDetails { private long[] timings = new long[Timing.values().length]; + // Rpc return status of this call + private RpcStatusProto returnStatus = RpcStatusProto.SUCCESS; + ProcessingDetails(TimeUnit timeUnit) { this.valueTimeUnit = timeUnit; } @@ -81,6 +86,14 @@ public class ProcessingDetails { timings[type.ordinal()] += valueTimeUnit.convert(value, timeUnit); } + public void setReturnStatus(RpcStatusProto status) { + this.returnStatus = status; + } + + public RpcStatusProto getReturnStatus() { + return returnStatus; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(256); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index e9a605a0043..451ce2619b9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -640,6 +640,9 @@ public abstract class Server { if (isLogSlowRPC()) { logSlowRpcCalls(name, call, details); } + if (details.getReturnStatus() == RpcStatusProto.SUCCESS) { + rpcMetrics.incrRpcCallSuccesses(); + } } void updateDeferredMetrics(String name, long processingTime) { @@ -1237,6 +1240,7 @@ public abstract class Server { setResponseFields(value, responseParams); sendResponse(); + details.setReturnStatus(responseParams.returnStatus); deltaNanos = Time.monotonicNowNanos() - startNanos; details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS); } else { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index c18562441fc..b5e09435a46 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -138,6 +138,8 @@ public class RpcMetrics { MutableCounterLong rpcSlowCalls; @Metric("Number of requeue calls") MutableCounterLong rpcRequeueCalls; + @Metric("Number of successful RPC calls") + MutableCounterLong rpcCallSuccesses; @Metric("Number of open connections") public int numOpenConnections() { return server.getNumOpenConnections(); @@ -330,6 +332,13 @@ public class RpcMetrics { rpcRequeueCalls.incr(); } + /** + * One RPC call success event + */ + public void incrRpcCallSuccesses() { + rpcCallSuccesses.incr(); + } + /** * Returns a MutableRate Counter. * @return Mutable Rate diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index 0777fc42abe..eeb8bcf50ba 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -82,6 +82,9 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc | `RpcAuthenticationSuccesses` | Total number of authentication successes | | `RpcAuthorizationFailures` | Total number of authorization failures | | `RpcAuthorizationSuccesses` | Total number of authorization successes | +| `RpcClientBackoff` | Total number of client backoff requests | +| `RpcSlowCalls` | Total number of slow RPC calls | +| `RpcCallsSuccesses` | Total number of RPC calls that are successfully processed | | `NumOpenConnections` | Current number of open connections | | `NumInProcessHandler` | Current number of handlers on working | | `CallQueueLength` | Current length of the call queue | diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index bbc241a420e..6e5cb7059c6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -1397,6 +1397,50 @@ public class TestRPC extends TestRpcBase { } } + /** + * Test the rpcCallSucesses metric in RpcMetrics. + */ + @Test + public void testRpcCallSuccessesMetric() throws Exception { + final Server server; + TestRpcService proxy = null; + + server = setupTestServer(conf, 5); + try { + proxy = getClient(addr, conf); + + // 10 successful responses + for (int i = 0; i < 10; i++) { + proxy.ping(null, newEmptyRequest()); + } + MetricsRecordBuilder rpcMetrics = + getMetrics(server.getRpcMetrics().name()); + assertEquals("Expected correct rpcCallSuccesses count", 10, + getLongCounter("RpcCallSuccesses", rpcMetrics)); + // rpcQueueTimeNumOps equals total number of RPC calls. + assertEquals("Expected correct rpcQueueTime count", 10, + getLongCounter("RpcQueueTimeNumOps", rpcMetrics)); + + // 2 failed responses with ERROR status and 1 more successful response. + for (int i = 0; i < 2; i++) { + try { + proxy.error(null, newEmptyRequest()); + } catch (ServiceException ignored) { + } + } + proxy.ping(null, newEmptyRequest()); + + rpcMetrics = getMetrics(server.getRpcMetrics().name()); + assertEquals("Expected correct rpcCallSuccesses count", 11, + getLongCounter("RpcCallSuccesses", rpcMetrics)); + // rpcQueueTimeNumOps equals total number of RPC calls. + assertEquals("Expected correct rpcQueueTime count", 13, + getLongCounter("RpcQueueTimeNumOps", rpcMetrics)); + } finally { + stop(server, proxy); + } + } + /** * Test RPC backoff by queue full. */