HADOOP-17127. Use RpcMetrics.TIMEUNIT to initialize rpc queueTime and processingTime. Contributed by Jim Brennan.

(cherry picked from 317fe4584a51cfe553e4098d48170cd2898b9732)
This commit is contained in:
Erik Krogen 2020-07-14 11:22:16 -07:00 committed by Erik Krogen
parent 5969922305
commit 67e01ed2ca
3 changed files with 14 additions and 9 deletions

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.metrics.DecayRpcSchedulerDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
@ -632,8 +633,8 @@ public void addResponseTime(String callName, Schedulable schedulable,
addCost(user, processingCost);
int priorityLevel = schedulable.getPriorityLevel();
long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
this.decayRpcSchedulerDetailedMetrics.addQueueTime(
priorityLevel, queueTime);

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.ipc;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
/**
* Implement this interface to be used for RPC scheduling and backoff.
@ -62,12 +62,12 @@ default void addResponseTime(String callName, Schedulable schedulable,
// this interface, a default implementation is supplied which uses the old
// method. All new implementations MUST override this interface and should
// NOT use the other addResponseTime method.
int queueTimeMs = (int)
details.get(ProcessingDetails.Timing.QUEUE, TimeUnit.MILLISECONDS);
int processingTimeMs = (int)
details.get(ProcessingDetails.Timing.PROCESSING, TimeUnit.MILLISECONDS);
int queueTime = (int)
details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT);
int processingTime = (int)
details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT);
addResponseTime(callName, schedulable.getPriorityLevel(),
queueTimeMs, processingTimeMs);
queueTime, processingTime);
}
void stop();

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.ipc.protobuf.TestProtos;
@ -81,6 +82,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -1095,7 +1097,9 @@ public TestRpcService run() {
proxy.lockAndSleep(null, newSleepRequest(5));
rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertGauge("RpcLockWaitTimeAvgTime", 10000.0, rpcMetrics);
assertGauge("RpcLockWaitTimeAvgTime",
(double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)),
rpcMetrics);
} finally {
if (proxy2 != null) {
RPC.stopProxy(proxy2);