HADOOP-17127. Use RpcMetrics.TIMEUNIT to initialize rpc queueTime and processingTime. Contributed by Jim Brennan.

This commit is contained in:
Erik Krogen 2020-07-15 08:33:42 -07:00
parent a4b419cdf5
commit f36d524a10
2 changed files with 8 additions and 3 deletions

View File

@ -40,6 +40,7 @@
import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSource;
@ -614,8 +615,8 @@ public void addResponseTime(String callName, Schedulable schedulable,
addCost(user, processingCost); addCost(user, processingCost);
int priorityLevel = schedulable.getPriorityLevel(); int priorityLevel = schedulable.getPriorityLevel();
long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS); long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS); long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
responseTimeCountInCurrWindow.getAndIncrement(priorityLevel); responseTimeCountInCurrWindow.getAndIncrement(priorityLevel);
responseTimeTotalInCurrWindow.getAndAdd(priorityLevel, responseTimeTotalInCurrWindow.getAndAdd(priorityLevel,

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.Server.Connection; import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.ipc.protobuf.TestProtos; import org.apache.hadoop.ipc.protobuf.TestProtos;
@ -82,6 +83,7 @@
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -1098,7 +1100,9 @@ public TestRpcService run() {
proxy.lockAndSleep(null, newSleepRequest(5)); proxy.lockAndSleep(null, newSleepRequest(5));
rpcMetrics = getMetrics(server.getRpcMetrics().name()); rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertGauge("RpcLockWaitTimeAvgTime", 10000.0, rpcMetrics); assertGauge("RpcLockWaitTimeAvgTime",
(double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)),
rpcMetrics);
} finally { } finally {
if (proxy2 != null) { if (proxy2 != null) {
RPC.stopProxy(proxy2); RPC.stopProxy(proxy2);