Added rpcCallSuccesses metric

This commit is contained in:
Xing Lin 2023-06-09 11:31:56 -07:00
parent 7a45ef4164
commit c03c62b353
5 changed files with 74 additions and 1 deletions

View File

@ -20,13 +20,15 @@ package org.apache.hadoop.ipc;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* Stores the times that a call takes to be processed through each step. * Stores the times that a call takes to be processed through each step and
* its response status.
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
@InterfaceAudience.Private @InterfaceAudience.Private
@ -53,6 +55,9 @@ public class ProcessingDetails {
private long[] timings = new long[Timing.values().length]; private long[] timings = new long[Timing.values().length];
// Rpc return status of this call
private RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
ProcessingDetails(TimeUnit timeUnit) { ProcessingDetails(TimeUnit timeUnit) {
this.valueTimeUnit = timeUnit; this.valueTimeUnit = timeUnit;
} }
@ -81,6 +86,14 @@ public class ProcessingDetails {
timings[type.ordinal()] += valueTimeUnit.convert(value, timeUnit); timings[type.ordinal()] += valueTimeUnit.convert(value, timeUnit);
} }
public void setReturnStatus(RpcStatusProto status) {
this.returnStatus = status;
}
public RpcStatusProto getReturnStatus() {
return returnStatus;
}
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(256); StringBuilder sb = new StringBuilder(256);

View File

@ -640,6 +640,9 @@ public abstract class Server {
if (isLogSlowRPC()) { if (isLogSlowRPC()) {
logSlowRpcCalls(name, call, details); logSlowRpcCalls(name, call, details);
} }
if (details.getReturnStatus() == RpcStatusProto.SUCCESS) {
rpcMetrics.incrRpcCallSuccesses();
}
} }
void updateDeferredMetrics(String name, long processingTime) { void updateDeferredMetrics(String name, long processingTime) {
@ -1237,6 +1240,7 @@ public abstract class Server {
setResponseFields(value, responseParams); setResponseFields(value, responseParams);
sendResponse(); sendResponse();
details.setReturnStatus(responseParams.returnStatus);
deltaNanos = Time.monotonicNowNanos() - startNanos; deltaNanos = Time.monotonicNowNanos() - startNanos;
details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS); details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
} else { } else {

View File

@ -138,6 +138,8 @@ public class RpcMetrics {
MutableCounterLong rpcSlowCalls; MutableCounterLong rpcSlowCalls;
@Metric("Number of requeue calls") @Metric("Number of requeue calls")
MutableCounterLong rpcRequeueCalls; MutableCounterLong rpcRequeueCalls;
@Metric("Number of successful RPC calls")
MutableCounterLong rpcCallSuccesses;
@Metric("Number of open connections") public int numOpenConnections() { @Metric("Number of open connections") public int numOpenConnections() {
return server.getNumOpenConnections(); return server.getNumOpenConnections();
@ -330,6 +332,13 @@ public class RpcMetrics {
rpcRequeueCalls.incr(); rpcRequeueCalls.incr();
} }
/**
* One RPC call success event
*/
public void incrRpcCallSuccesses() {
rpcCallSuccesses.incr();
}
/** /**
* Returns a MutableRate Counter. * Returns a MutableRate Counter.
* @return Mutable Rate * @return Mutable Rate

View File

@ -82,6 +82,9 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc
| `RpcAuthenticationSuccesses` | Total number of authentication successes | | `RpcAuthenticationSuccesses` | Total number of authentication successes |
| `RpcAuthorizationFailures` | Total number of authorization failures | | `RpcAuthorizationFailures` | Total number of authorization failures |
| `RpcAuthorizationSuccesses` | Total number of authorization successes | | `RpcAuthorizationSuccesses` | Total number of authorization successes |
| `RpcClientBackoff` | Total number of client backoff requests |
| `RpcSlowCalls` | Total number of slow RPC calls |
| `RpcCallsSuccesses` | Total number of RPC calls that are successfully processed |
| `NumOpenConnections` | Current number of open connections | | `NumOpenConnections` | Current number of open connections |
| `NumInProcessHandler` | Current number of handlers on working | | `NumInProcessHandler` | Current number of handlers on working |
| `CallQueueLength` | Current length of the call queue | | `CallQueueLength` | Current length of the call queue |

View File

@ -1397,6 +1397,50 @@ public class TestRPC extends TestRpcBase {
} }
} }
/**
* Test the rpcCallSucesses metric in RpcMetrics.
*/
@Test
public void testRpcCallSuccessesMetric() throws Exception {
final Server server;
TestRpcService proxy = null;
server = setupTestServer(conf, 5);
try {
proxy = getClient(addr, conf);
// 10 successful responses
for (int i = 0; i < 10; i++) {
proxy.ping(null, newEmptyRequest());
}
MetricsRecordBuilder rpcMetrics =
getMetrics(server.getRpcMetrics().name());
assertEquals("Expected correct rpcCallSuccesses count", 10,
getLongCounter("RpcCallSuccesses", rpcMetrics));
// rpcQueueTimeNumOps equals total number of RPC calls.
assertEquals("Expected correct rpcQueueTime count", 10,
getLongCounter("RpcQueueTimeNumOps", rpcMetrics));
// 2 failed responses with ERROR status and 1 more successful response.
for (int i = 0; i < 2; i++) {
try {
proxy.error(null, newEmptyRequest());
} catch (ServiceException ignored) {
}
}
proxy.ping(null, newEmptyRequest());
rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertEquals("Expected correct rpcCallSuccesses count", 11,
getLongCounter("RpcCallSuccesses", rpcMetrics));
// rpcQueueTimeNumOps equals total number of RPC calls.
assertEquals("Expected correct rpcQueueTime count", 13,
getLongCounter("RpcQueueTimeNumOps", rpcMetrics));
} finally {
stop(server, proxy);
}
}
/** /**
* Test RPC backoff by queue full. * Test RPC backoff by queue full.
*/ */