diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 78529565a52..41afb6ee64e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -498,6 +498,7 @@ public abstract class Server { private Map auxiliaryListenerMap; private Responder responder = null; private Handler[] handlers = null; + private final AtomicInteger numInProcessHandler = new AtomicInteger(); private boolean logSlowRPC = false; @@ -509,6 +510,10 @@ public abstract class Server { return logSlowRPC; } + public int getNumInProcessHandler() { + return numInProcessHandler.get(); + } + /** * Sets slow RPC flag. * @param logSlowRPCFlag @@ -3080,6 +3085,7 @@ public abstract class Server { try { call = callQueue.take(); // pop the queue; maybe blocked here + numInProcessHandler.incrementAndGet(); startTimeNanos = Time.monotonicNowNanos(); if (alignmentContext != null && call.isCallCoordinated() && call.getClientStateId() > alignmentContext.getLastSeenStateId()) { @@ -3133,6 +3139,7 @@ public abstract class Server { } } finally { CurCall.set(null); + numInProcessHandler.decrementAndGet(); IOUtils.cleanupWithLogger(LOG, traceScope); if (call != null) { updateMetrics(call, startTimeNanos, connDropped); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index cbb4bb36d1d..a67530b3c97 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -133,6 +133,11 @@ public class RpcMetrics { return server.getNumOpenConnections(); } + @Metric("Number of in process handlers") + public int getNumInProcessHandler() { + return server.getNumInProcessHandler(); + } + @Metric("Number of open connections per user") public String numOpenConnectionsPerUser() { return server.getNumOpenConnectionsPerUser(); @@ -288,6 +293,7 @@ public class RpcMetrics { public void incrSlowRpc() { rpcSlowCalls.incr(); } + /** * Returns a MutableRate Counter. * @return Mutable Rate diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index 0db6e1a58e4..190f2a83580 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -83,6 +83,7 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc | `RpcAuthorizationFailures` | Total number of authorization failures | | `RpcAuthorizationSuccesses` | Total number of authorization successes | | `NumOpenConnections` | Current number of open connections | +| `NumInProcessHandler` | Current number of handlers on working | | `CallQueueLength` | Current length of the call queue | | `numDroppedConnections` | Total number of dropped connections | | `rpcQueueTime`*num*`sNumOps` | Shows total number of RPC calls (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. | diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index b78900b609e..5fc9cb5410b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ipc; +import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.thirdparty.protobuf.ServiceException; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; @@ -1107,6 +1108,37 @@ public class TestRPC extends TestRpcBase { } } + @Test + public void testNumInProcessHandlerMetrics() throws Exception { + UserGroupInformation ugi = UserGroupInformation. + createUserForTesting("user123", new String[0]); + // use 1 handler so the callq can be plugged + final Server server = setupTestServer(conf, 1); + try { + RpcMetrics rpcMetrics = server.getRpcMetrics(); + assertEquals(0, rpcMetrics.getNumInProcessHandler()); + + ExternalCall call1 = newExtCall(ugi, () -> { + assertEquals(1, rpcMetrics.getNumInProcessHandler()); + return UserGroupInformation.getCurrentUser().getUserName(); + }); + ExternalCall call2 = newExtCall(ugi, () -> { + assertEquals(1, rpcMetrics.getNumInProcessHandler()); + return null; + }); + + server.queueCall(call1); + server.queueCall(call2); + + // Wait for call1 and call2 to enter the handler. + call1.get(); + call2.get(); + assertEquals(0, rpcMetrics.getNumInProcessHandler()); + } finally { + server.stop(); + } + } + /** * Test RPC backoff by queue full. */