diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 00116bb2c3e..a608c8dd350 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -89,6 +89,9 @@ Trunk (unreleased changes) HADOOP-8084. Updates ProtoBufRpc engine to not do an unnecessary copy for RPC request/response. (ddas) + HADOOP-8085. Add RPC metrics to ProtobufRpcEngine. (Hari Mankude via + suresh) + BUG FIXES HADOOP-8018. Hudson auto test for HDFS has started throwing javadoc diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 9a1a1615b0c..ff2c1a4a532 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -379,6 +379,24 @@ public class ProtobufRpcEngine implements RpcEngine { * Protobuf invoker for {@link RpcInvoker} */ static class ProtoBufRpcInvoker implements RpcInvoker { + private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server, + String protoName, long version) throws IOException { + ProtoNameVer pv = new ProtoNameVer(protoName, version); + ProtoClassProtoImpl impl = + server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv); + if (impl == null) { // no match for Protocol AND Version + VerProtocolImpl highest = + server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER, + protoName); + if (highest == null) { + throw new IOException("Unknown protocol: " + protoName); + } + // protocol supported but not the version that client wants + throw new RPC.VersionMismatch(protoName, version, + highest.version); + } + return impl; + } @Override /** @@ -409,21 +427,8 @@ public class ProtobufRpcEngine implements RpcEngine { if (server.verbose) LOG.info("Call: protocol=" + protocol + ", method=" + methodName); - ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion); - ProtoClassProtoImpl protocolImpl = - server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv); - if (protocolImpl == null) { // no match for Protocol AND Version - VerProtocolImpl highest = - server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER, - protoName); - if (highest == null) { - throw new IOException("Unknown protocol: " + protoName); - } - // protocol supported but not the version that client wants - throw new RPC.VersionMismatch(protoName, clientVersion, - highest.version); - } - + ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName, + clientVersion); BlockingService service = (BlockingService) protocolImpl.protocolImpl; MethodDescriptor methodDescriptor = service.getDescriptorForType() .findMethodByName(methodName); @@ -438,7 +443,19 @@ public class ProtobufRpcEngine implements RpcEngine { .mergeFrom(rpcRequest.getRequest()).build(); Message result; try { + long startTime = System.currentTimeMillis(); + server.rpcDetailedMetrics.init(protocolImpl.protocolClass); result = service.callBlockingMethod(methodDescriptor, null, param); + int processingTime = (int) (System.currentTimeMillis() - startTime); + int qTime = (int) (startTime - receiveTime); + if (LOG.isDebugEnabled()) { + LOG.info("Served: " + methodName + " queueTime= " + qTime + + " procesingTime= " + processingTime); + } + server.rpcMetrics.addRpcQueueTime(qTime); + server.rpcMetrics.addRpcProcessingTime(processingTime); + server.rpcDetailedMetrics.addProcessingTime(methodName, + processingTime); } catch (ServiceException e) { Throwable cause = e.getCause(); return handleException(cause != null ? cause : e); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index c40dac584a6..289bee77fe7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -317,10 +317,15 @@ public abstract class Server { * Returns a handle to the rpcMetrics (required in tests) * @return rpc metrics */ + @VisibleForTesting public RpcMetrics getRpcMetrics() { return rpcMetrics; } + @VisibleForTesting + public RpcDetailedMetrics getRpcDetailedMetrics() { + return rpcDetailedMetrics; + } @VisibleForTesting Iterable getHandlers() { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java index ea48b98db4d..3b9140afc4c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.ipc; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; + import java.io.IOException; import java.net.InetSocketAddress; @@ -28,6 +31,7 @@ import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetUtils; import org.junit.Assert; import org.junit.Test; @@ -187,5 +191,14 @@ public class TestProtoBufRpc { .setMessage("hello").build(); EchoResponseProto echoResponse = client.echo2(null, echoRequest); Assert.assertEquals(echoResponse.getMessage(), "hello"); + + // Ensure RPC metrics are updated + MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name()); + assertCounterGt("RpcQueueTimeNumOps", 0L, rpcMetrics); + assertCounterGt("RpcProcessingTimeNumOps", 0L, rpcMetrics); + + MetricsRecordBuilder rpcDetailedMetrics = + getMetrics(server.getRpcDetailedMetrics().name()); + assertCounterGt("Echo2NumOps", 0L, rpcDetailedMetrics); } } \ No newline at end of file