Added per-method overallRpcProcessingTime metric

This commit is contained in:
Xing Lin 2023-06-09 12:14:13 -07:00
parent c03c62b353
commit d1666601d5
3 changed files with 73 additions and 4 deletions

View File

@ -600,17 +600,18 @@ public abstract class Server {
}
}
void updateMetrics(Call call, long startTime, boolean connDropped) {
void updateMetrics(Call call, long processingStartTime, boolean connDropped) {
totalRequests.increment();
// delta = handler + processing + response
long deltaNanos = Time.monotonicNowNanos() - startTime;
long timestampNanos = call.timestampNanos;
long completionTime = Time.monotonicNowNanos();
long deltaNanos = completionTime - processingStartTime;
long arrivalTime = call.timestampNanos;
ProcessingDetails details = call.getProcessingDetails();
// queue time is the delta between when the call first arrived and when it
// began being serviced, minus the time it took to be put into the queue
details.set(Timing.QUEUE,
startTime - timestampNanos - details.get(Timing.ENQUEUE));
processingStartTime - arrivalTime - details.get(Timing.ENQUEUE));
deltaNanos -= details.get(Timing.PROCESSING);
deltaNanos -= details.get(Timing.RESPONSE);
details.set(Timing.HANDLER, deltaNanos);
@ -636,6 +637,10 @@ public abstract class Server {
processingTime -= waitTime;
String name = call.getDetailedMetricsName();
rpcDetailedMetrics.addProcessingTime(name, processingTime);
// Overall processing time is from arrival to completion.
rpcDetailedMetrics.addOverallProcessingTime(name,
rpcMetrics.getMetricsTimeUnit().convert(completionTime - arrivalTime,
TimeUnit.NANOSECONDS));
callQueue.addResponseTime(name, call, details);
if (isLogSlowRPC()) {
logSlowRpcCalls(name, call, details);

View File

@ -26,6 +26,9 @@ import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.commons.lang3.StringUtils.capitalize;
/**
* This class is for maintaining RPC method related statistics
* and publishing them through the metrics interfaces.
@ -33,14 +36,26 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
@Metrics(about="Per method RPC metrics", context="rpcdetailed")
public class RpcDetailedMetrics {
static final String OVERALL_PROCESSING_PREFIX = "Overall";
// per-method RPC processing time
@Metric MutableRatesWithAggregation rates;
@Metric MutableRatesWithAggregation deferredRpcRates;
/**
* per-method overall RPC processing time, from request arrival to when the
* response is sent back.
*/
@Metric MutableRatesWithAggregation overallRpcProcessingRates;
static final Logger LOG = LoggerFactory.getLogger(RpcDetailedMetrics.class);
final MetricsRegistry registry;
final String name;
// Mainly to facilitate testing in TestRPC.java
public MutableRatesWithAggregation getOverallRpcProcessingRates() {
return overallRpcProcessingRates;
}
RpcDetailedMetrics(int port) {
name = "RpcDetailedActivityForPort"+ port;
registry = new MetricsRegistry("rpcdetailed")
@ -62,6 +77,7 @@ public class RpcDetailedMetrics {
public void init(Class<?> protocol) {
rates.init(protocol);
deferredRpcRates.init(protocol, "Deferred");
overallRpcProcessingRates.init(protocol);
}
/**
@ -78,6 +94,16 @@ public class RpcDetailedMetrics {
deferredRpcRates.add(name, processingTime);
}
/**
* Add an overall RPC processing time sample
* @param rpcCallName of the RPC call
* @param overallProcessingTime the overall RPC processing time
*/
public void addOverallProcessingTime(String rpcCallName, long overallProcessingTime) {
String metric = OVERALL_PROCESSING_PREFIX + capitalize(rpcCallName);
overallRpcProcessingRates.add(metric, overallProcessingTime);
}
/**
* Shutdown the instrumentation for the process
*/

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.Rpc
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
@ -95,6 +96,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
@ -1441,6 +1444,41 @@ public class TestRPC extends TestRpcBase {
}
}
/**
* Test per-type overall RPC processing time metric
*/
@Test
public void testOverallRpcProcessingTimeMetric() throws Exception {
final Server server;
TestRpcService proxy = null;
server = setupTestServer(conf, 5);
try {
proxy = getClient(addr, conf);
// Sent 1 ping request and 2 lockAndSleep requests
proxy.ping(null, newEmptyRequest());
proxy.lockAndSleep(null, newSleepRequest(10));
proxy.lockAndSleep(null, newSleepRequest(12));
MetricsRecordBuilder rb = mockMetricsRecordBuilder();
MutableRatesWithAggregation rates =
server.rpcDetailedMetrics.getOverallRpcProcessingRates();
rates.snapshot(rb, true);
// Verify the ping request. AvgTime should be non-zero.
assertCounter("OverallPingNumOps", 1L, rb);
assertGaugeGt("OverallPingAvgTime", 0, rb);
// Verify lockAndSleep requests. AvgTime should be greater than 10 ms,
// since we sleep for 10 and 12 ms respectively.
assertCounter("OverallLockAndSleepNumOps", 2L, rb);
assertGaugeGt("OverallLockAndSleepAvgTime", 10.0, rb);
} finally {
stop(server, proxy);
}
}
/**
* Test RPC backoff by queue full.
*/