HADOOP-17127. Use RpcMetrics.TIMEUNIT to initialize rpc queueTime and processingTime. Contributed by Jim Brennan.
This commit is contained in:
parent
43a865dc07
commit
bd37b72ea9
@ -42,6 +42,7 @@
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
@ -622,8 +623,8 @@ public void addResponseTime(String callName, Schedulable schedulable,
|
||||
addCost(user, processingCost);
|
||||
|
||||
int priorityLevel = schedulable.getPriorityLevel();
|
||||
long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
|
||||
long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
|
||||
long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
|
||||
long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
|
||||
|
||||
responseTimeCountInCurrWindow.getAndIncrement(priorityLevel);
|
||||
responseTimeTotalInCurrWindow.getAndAdd(priorityLevel,
|
||||
|
@ -18,7 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
||||
|
||||
/**
|
||||
* Implement this interface to be used for RPC scheduling and backoff.
|
||||
@ -62,12 +62,12 @@ default void addResponseTime(String callName, Schedulable schedulable,
|
||||
// this interface, a default implementation is supplied which uses the old
|
||||
// method. All new implementations MUST override this interface and should
|
||||
// NOT use the other addResponseTime method.
|
||||
int queueTimeMs = (int)
|
||||
details.get(ProcessingDetails.Timing.QUEUE, TimeUnit.MILLISECONDS);
|
||||
int processingTimeMs = (int)
|
||||
details.get(ProcessingDetails.Timing.PROCESSING, TimeUnit.MILLISECONDS);
|
||||
int queueTime = (int)
|
||||
details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT);
|
||||
int processingTime = (int)
|
||||
details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT);
|
||||
addResponseTime(callName, schedulable.getPriorityLevel(),
|
||||
queueTimeMs, processingTimeMs);
|
||||
queueTime, processingTime);
|
||||
}
|
||||
|
||||
void stop();
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||
import org.apache.hadoop.ipc.Server.Call;
|
||||
import org.apache.hadoop.ipc.Server.Connection;
|
||||
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
||||
@ -81,6 +82,7 @@
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ -1094,7 +1096,9 @@ public TestRpcService run() {
|
||||
|
||||
proxy.lockAndSleep(null, newSleepRequest(5));
|
||||
rpcMetrics = getMetrics(server.getRpcMetrics().name());
|
||||
assertGauge("RpcLockWaitTimeAvgTime", 10000.0, rpcMetrics);
|
||||
assertGauge("RpcLockWaitTimeAvgTime",
|
||||
(double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)),
|
||||
rpcMetrics);
|
||||
} finally {
|
||||
if (proxy2 != null) {
|
||||
RPC.stopProxy(proxy2);
|
||||
|
Loading…
x
Reference in New Issue
Block a user