HADOOP-10673. Merge change r1610879 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1610881 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-07-15 23:12:20 +00:00
parent 9463cbd1ec
commit 32d9fd2ca3
5 changed files with 57 additions and 34 deletions

View File

@ -16,6 +16,9 @@ Release 2.6.0 - UNRELEASED
if the override value is same as the final parameter value.
(Ravi Prakash via suresh)
HADOOP-10673. Update rpc metrics when the call throws an exception. (Ming Ma
via jing9)
OPTIMIZATIONS
BUG FIXES

View File

@ -579,24 +579,35 @@ public class ProtobufRpcEngine implements RpcEngine {
.mergeFrom(request.theRequestRead).build();
Message result;
long startTime = Time.now();
int qTime = (int) (startTime - receiveTime);
Exception exception = null;
try {
long startTime = Time.now();
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
result = service.callBlockingMethod(methodDescriptor, null, param);
int processingTime = (int) (Time.now() - startTime);
int qTime = (int) (startTime - receiveTime);
if (LOG.isDebugEnabled()) {
LOG.info("Served: " + methodName + " queueTime= " + qTime +
" procesingTime= " + processingTime);
}
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(methodName,
processingTime);
} catch (ServiceException e) {
exception = (Exception) e.getCause();
throw (Exception) e.getCause();
} catch (Exception e) {
exception = e;
throw e;
} finally {
int processingTime = (int) (Time.now() - startTime);
if (LOG.isDebugEnabled()) {
String msg = "Served: " + methodName + " queueTime= " + qTime +
" procesingTime= " + processingTime;
if (exception != null) {
msg += " exception= " + exception.getClass().getSimpleName();
}
LOG.debug(msg);
}
String detailedMetricsName = (exception == null) ?
methodName :
exception.getClass().getSimpleName();
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
processingTime);
}
return new RpcResponseWrapper(result);
}

View File

@ -352,8 +352,8 @@ public abstract class Server {
private int readThreads; // number of read threads
private int readerPendingConnectionQueue; // number of connections to queue per read thread
private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request
protected RpcMetrics rpcMetrics;
protected RpcDetailedMetrics rpcDetailedMetrics;
final protected RpcMetrics rpcMetrics;
final protected RpcDetailedMetrics rpcDetailedMetrics;
private Configuration conf;
private String portRangeConfig = null;
@ -2408,12 +2408,8 @@ public abstract class Server {
listener.doStop();
responder.interrupt();
notifyAll();
if (this.rpcMetrics != null) {
this.rpcMetrics.shutdown();
}
if (this.rpcDetailedMetrics != null) {
this.rpcDetailedMetrics.shutdown();
}
this.rpcMetrics.shutdown();
this.rpcDetailedMetrics.shutdown();
}
/** Wait for the server to be stopped.

View File

@ -471,37 +471,29 @@ public class WritableRpcEngine implements RpcEngine {
// Invoke the protocol method
long startTime = Time.now();
int qTime = (int) (startTime-receivedTime);
Exception exception = null;
try {
long startTime = Time.now();
Method method =
Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
Object value =
method.invoke(protocolImpl.protocolImpl, call.getParameters());
int processingTime = (int) (Time.now() - startTime);
int qTime = (int) (startTime-receivedTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Served: " + call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime);
}
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
processingTime);
if (server.verbose) log("Return: "+value);
return new ObjectWritable(method.getReturnType(), value);
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof IOException) {
exception = (IOException)target;
throw (IOException)target;
} else {
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
exception = ioe;
throw ioe;
}
} catch (Throwable e) {
@ -510,8 +502,27 @@ public class WritableRpcEngine implements RpcEngine {
}
IOException ioe = new IOException(e.toString());
ioe.setStackTrace(e.getStackTrace());
exception = ioe;
throw ioe;
}
} finally {
int processingTime = (int) (Time.now() - startTime);
if (LOG.isDebugEnabled()) {
String msg = "Served: " + call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime;
if (exception != null) {
msg += " exception= " + exception.getClass().getSimpleName();
}
LOG.debug(msg);
}
String detailedMetricsName = (exception == null) ?
call.getMethodName() :
exception.getClass().getSimpleName();
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
processingTime);
}
}
}
}

View File

@ -496,6 +496,8 @@ public class TestRPC {
caught = true;
}
assertTrue(caught);
rb = getMetrics(server.rpcDetailedMetrics.name());
assertCounter("IOExceptionNumOps", 1L, rb);
proxy.testServerGet();