HADOOP-10673. Update rpc metrics when the call throws an exception. Contributed by Ming Ma.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1610879 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-07-15 23:05:40 +00:00
parent 030580387a
commit 790ee45643
5 changed files with 57 additions and 34 deletions

View File

@ -401,6 +401,9 @@ Release 2.6.0 - UNRELEASED
if the override value is same as the final parameter value. if the override value is same as the final parameter value.
(Ravi Prakash via suresh) (Ravi Prakash via suresh)
HADOOP-10673. Update rpc metrics when the call throws an exception. (Ming Ma
via jing9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -599,24 +599,35 @@ public class ProtobufRpcEngine implements RpcEngine {
.mergeFrom(request.theRequestRead).build(); .mergeFrom(request.theRequestRead).build();
Message result; Message result;
long startTime = Time.now();
int qTime = (int) (startTime - receiveTime);
Exception exception = null;
try { try {
long startTime = Time.now();
server.rpcDetailedMetrics.init(protocolImpl.protocolClass); server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
result = service.callBlockingMethod(methodDescriptor, null, param); result = service.callBlockingMethod(methodDescriptor, null, param);
int processingTime = (int) (Time.now() - startTime);
int qTime = (int) (startTime - receiveTime);
if (LOG.isDebugEnabled()) {
LOG.info("Served: " + methodName + " queueTime= " + qTime +
" procesingTime= " + processingTime);
}
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(methodName,
processingTime);
} catch (ServiceException e) { } catch (ServiceException e) {
exception = (Exception) e.getCause();
throw (Exception) e.getCause(); throw (Exception) e.getCause();
} catch (Exception e) { } catch (Exception e) {
exception = e;
throw e; throw e;
} finally {
int processingTime = (int) (Time.now() - startTime);
if (LOG.isDebugEnabled()) {
String msg = "Served: " + methodName + " queueTime= " + qTime +
" procesingTime= " + processingTime;
if (exception != null) {
msg += " exception= " + exception.getClass().getSimpleName();
}
LOG.debug(msg);
}
String detailedMetricsName = (exception == null) ?
methodName :
exception.getClass().getSimpleName();
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
processingTime);
} }
return new RpcResponseWrapper(result); return new RpcResponseWrapper(result);
} }

View File

@ -355,8 +355,8 @@ public abstract class Server {
private int readThreads; // number of read threads private int readThreads; // number of read threads
private int readerPendingConnectionQueue; // number of connections to queue per read thread private int readerPendingConnectionQueue; // number of connections to queue per read thread
private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request
protected RpcMetrics rpcMetrics; final protected RpcMetrics rpcMetrics;
protected RpcDetailedMetrics rpcDetailedMetrics; final protected RpcDetailedMetrics rpcDetailedMetrics;
private Configuration conf; private Configuration conf;
private String portRangeConfig = null; private String portRangeConfig = null;
@ -2494,12 +2494,8 @@ public abstract class Server {
listener.doStop(); listener.doStop();
responder.interrupt(); responder.interrupt();
notifyAll(); notifyAll();
if (this.rpcMetrics != null) { this.rpcMetrics.shutdown();
this.rpcMetrics.shutdown(); this.rpcDetailedMetrics.shutdown();
}
if (this.rpcDetailedMetrics != null) {
this.rpcDetailedMetrics.shutdown();
}
} }
/** Wait for the server to be stopped. /** Wait for the server to be stopped.

View File

@ -471,8 +471,10 @@ public class WritableRpcEngine implements RpcEngine {
// Invoke the protocol method // Invoke the protocol method
long startTime = Time.now();
int qTime = (int) (startTime-receivedTime);
Exception exception = null;
try { try {
long startTime = Time.now();
Method method = Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(), protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses()); call.getParameterClasses());
@ -480,28 +482,18 @@ public class WritableRpcEngine implements RpcEngine {
server.rpcDetailedMetrics.init(protocolImpl.protocolClass); server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
Object value = Object value =
method.invoke(protocolImpl.protocolImpl, call.getParameters()); method.invoke(protocolImpl.protocolImpl, call.getParameters());
int processingTime = (int) (Time.now() - startTime);
int qTime = (int) (startTime-receivedTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Served: " + call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime);
}
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(),
processingTime);
if (server.verbose) log("Return: "+value); if (server.verbose) log("Return: "+value);
return new ObjectWritable(method.getReturnType(), value); return new ObjectWritable(method.getReturnType(), value);
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
Throwable target = e.getTargetException(); Throwable target = e.getTargetException();
if (target instanceof IOException) { if (target instanceof IOException) {
exception = (IOException)target;
throw (IOException)target; throw (IOException)target;
} else { } else {
IOException ioe = new IOException(target.toString()); IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace()); ioe.setStackTrace(target.getStackTrace());
exception = ioe;
throw ioe; throw ioe;
} }
} catch (Throwable e) { } catch (Throwable e) {
@ -510,8 +502,27 @@ public class WritableRpcEngine implements RpcEngine {
} }
IOException ioe = new IOException(e.toString()); IOException ioe = new IOException(e.toString());
ioe.setStackTrace(e.getStackTrace()); ioe.setStackTrace(e.getStackTrace());
exception = ioe;
throw ioe; throw ioe;
} } finally {
int processingTime = (int) (Time.now() - startTime);
if (LOG.isDebugEnabled()) {
String msg = "Served: " + call.getMethodName() +
" queueTime= " + qTime +
" procesingTime= " + processingTime;
if (exception != null) {
msg += " exception= " + exception.getClass().getSimpleName();
}
LOG.debug(msg);
}
String detailedMetricsName = (exception == null) ?
call.getMethodName() :
exception.getClass().getSimpleName();
server.rpcMetrics.addRpcQueueTime(qTime);
server.rpcMetrics.addRpcProcessingTime(processingTime);
server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
processingTime);
}
} }
} }
} }

View File

@ -496,6 +496,8 @@ public class TestRPC {
caught = true; caught = true;
} }
assertTrue(caught); assertTrue(caught);
rb = getMetrics(server.rpcDetailedMetrics.name());
assertCounter("IOExceptionNumOps", 1L, rb);
proxy.testServerGet(); proxy.testServerGet();