diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 07a2f13a442..f01ac3090b3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -34,6 +34,7 @@ import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; +import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.Server.AuthProtocol; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; @@ -86,6 +87,7 @@ import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID; public class Client implements AutoCloseable { public static final Logger LOG = LoggerFactory.getLogger(Client.class); + private final RpcDetailedMetrics rpcDetailedMetrics; /** A counter for generating call IDs. */ private static final AtomicInteger callIdCounter = new AtomicInteger(); @@ -208,6 +210,24 @@ public class Client implements AutoCloseable { } }; + /** + * Update a particular metric by recording the processing + * time of the metric. + * + * @param name Metric name + * @param processingTime time spent in processing the metric. + */ + public void updateMetrics(String name, long processingTime) { + rpcDetailedMetrics.addProcessingTime(name, processingTime); + } + + /** + * Get the RpcDetailedMetrics associated with the Client. + */ + public RpcDetailedMetrics getRpcDetailedMetrics() { + return rpcDetailedMetrics; + } + /** * set the ping interval value in configuration * @@ -1301,6 +1321,11 @@ public class Client implements AutoCloseable { this.maxAsyncCalls = conf.getInt( CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT); + /** + * Create with port of -1, dummy port since the function + * takes default argument. + */ + this.rpcDetailedMetrics = RpcDetailedMetrics.create(-1); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 639bbadffbd..fa3b61a11c6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -49,6 +49,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.metrics2.MetricStringBuilder; +import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation; /** * RPC Engine for for protobuf based RPCs. @@ -190,7 +192,7 @@ public class ProtobufRpcEngine implements RpcEngine { throws ServiceException { long startTime = 0; if (LOG.isDebugEnabled()) { - startTime = Time.now(); + startTime = System.currentTimeMillis(); } if (args.length != 2) { // RpcController + Message @@ -245,8 +247,16 @@ public class ProtobufRpcEngine implements RpcEngine { } if (LOG.isDebugEnabled()) { - long callTime = Time.now() - startTime; - LOG.debug("Call: " + method.getName() + " took " + callTime + "ms"); + long callTime = System.currentTimeMillis() - startTime; + if (callTime > 0) { + MetricStringBuilder rb = + new MetricStringBuilder(null, "", " = ", "\n"); + client.updateMetrics(method.getName(), callTime); + MutableRatesWithAggregation rates = + client.getRpcDetailedMetrics().getMutableRates(); + rates.snapshot(rb, true); + LOG.debug("RPC Client stats: {}", rb); + } } if (Client.isAsynchronousMode()) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java index 6ed57ec6d97..9be9c5addde 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java @@ -70,12 +70,16 @@ public class RpcDetailedMetrics { * @param processingTime the processing time */ //@Override // some instrumentation interface - public void addProcessingTime(String name, int processingTime) { - rates.add(name, processingTime); + public void addProcessingTime(String metName, long processingTime) { + rates.add(metName, processingTime); } - public void addDeferredProcessingTime(String name, long processingTime) { - deferredRpcRates.add(name, processingTime); + public void addDeferredProcessingTime(String metName, long processingTime) { + deferredRpcRates.add(metName, processingTime); + } + + public MutableRatesWithAggregation getMutableRates() { + return rates; } /**