svn merge -c 1293071 from trunk for HADOOP-8085.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1293344 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-02-24 17:14:12 +00:00
parent 9cb779d735
commit e0847086a5
4 changed files with 53 additions and 15 deletions

View File

@ -50,6 +50,9 @@ Release 0.23-PB - Unreleased
HADOOP-8084. Updates ProtoBufRpc engine to not do an unnecessary copy
for RPC request/response. (ddas)
HADOOP-8085. Add RPC metrics to ProtobufRpcEngine. (Hari Mankude via
suresh)
BUG FIXES
HADOOP-7695. RPC.stopProxy can throw unintended exception while logging

View File

@ -379,6 +379,24 @@ public class ProtobufRpcEngine implements RpcEngine {
* Protobuf invoker for {@link RpcInvoker}
*/
static class ProtoBufRpcInvoker implements RpcInvoker {
private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,
String protoName, long version) throws IOException {
ProtoNameVer pv = new ProtoNameVer(protoName, version);
ProtoClassProtoImpl impl =
server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
if (impl == null) { // no match for Protocol AND Version
VerProtocolImpl highest =
server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER,
protoName);
if (highest == null) {
throw new IOException("Unknown protocol: " + protoName);
}
// protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, version,
highest.version);
}
return impl;
}
@Override
/**
@ -409,21 +427,8 @@ public class ProtobufRpcEngine implements RpcEngine {
if (server.verbose)
LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
ProtoClassProtoImpl protocolImpl =
server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
if (protocolImpl == null) { // no match for Protocol AND Version
VerProtocolImpl highest =
server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER,
protoName);
if (highest == null) {
throw new IOException("Unknown protocol: " + protoName);
}
// protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, clientVersion,
highest.version);
}
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
clientVersion);
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
@ -438,7 +443,19 @@ public class ProtobufRpcEngine implements RpcEngine {
.mergeFrom(rpcRequest.getRequest()).build();
Message result;
try {
long startTime = System.currentTimeMillis();
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
result = service.callBlockingMethod(methodDescriptor, null, param);
int processingTime = (int) (System.currentTimeMillis() - startTime);
int qTime = (int) (startTime - receiveTime);
if (LOG.isDebugEnabled()) {
LOG.info("Served: " + methodName + " queueTime= " + qTime +
" procesingTime= " + processingTime);
}
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(methodName,
processingTime);
} catch (ServiceException e) {
Throwable cause = e.getCause();
return handleException(cause != null ? cause : e);

View File

@ -300,10 +300,15 @@ public abstract class Server {
* Returns a handle to the rpcMetrics (required in tests)
* @return rpc metrics
*/
@VisibleForTesting
public RpcMetrics getRpcMetrics() {
return rpcMetrics;
}
@VisibleForTesting
public RpcDetailedMetrics getRpcDetailedMetrics() {
return rpcDetailedMetrics;
}
@VisibleForTesting
Iterable<? extends Thread> getHandlers() {

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.ipc;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -28,6 +31,7 @@ import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetUtils;
import org.junit.Assert;
import org.junit.Test;
@ -187,5 +191,14 @@ public class TestProtoBufRpc {
.setMessage("hello").build();
EchoResponseProto echoResponse = client.echo2(null, echoRequest);
Assert.assertEquals(echoResponse.getMessage(), "hello");
// Ensure RPC metrics are updated
MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertCounterGt("RpcQueueTimeNumOps", 0L, rpcMetrics);
assertCounterGt("RpcProcessingTimeNumOps", 0L, rpcMetrics);
MetricsRecordBuilder rpcDetailedMetrics =
getMetrics(server.getRpcDetailedMetrics().name());
assertCounterGt("Echo2NumOps", 0L, rpcDetailedMetrics);
}
}