diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 749c72f5fd5..83bac85b638 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -401,6 +401,9 @@ Release 2.6.0 - UNRELEASED if the override value is same as the final parameter value. (Ravi Prakash via suresh) + HADOOP-10673. Update rpc metrics when the call throws an exception. (Ming Ma + via jing9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 3bdcbd9856b..64615d22f85 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -599,24 +599,35 @@ public class ProtobufRpcEngine implements RpcEngine { .mergeFrom(request.theRequestRead).build(); Message result; + long startTime = Time.now(); + int qTime = (int) (startTime - receiveTime); + Exception exception = null; try { - long startTime = Time.now(); server.rpcDetailedMetrics.init(protocolImpl.protocolClass); result = service.callBlockingMethod(methodDescriptor, null, param); - int processingTime = (int) (Time.now() - startTime); - int qTime = (int) (startTime - receiveTime); - if (LOG.isDebugEnabled()) { - LOG.info("Served: " + methodName + " queueTime= " + qTime + - " procesingTime= " + processingTime); - } - server.rpcMetrics.addRpcQueueTime(qTime); - server.rpcMetrics.addRpcProcessingTime(processingTime); - server.rpcDetailedMetrics.addProcessingTime(methodName, - processingTime); } catch (ServiceException e) { + exception = (Exception) e.getCause(); throw (Exception) e.getCause(); } catch (Exception e) { + exception = e; throw e; + } finally { + int processingTime = (int) (Time.now() - startTime); + if (LOG.isDebugEnabled()) { + String msg = "Served: " + methodName + " queueTime= " + qTime + + " procesingTime= " + processingTime; + if (exception != null) { + msg += " exception= " + exception.getClass().getSimpleName(); + } + LOG.debug(msg); + } + String detailedMetricsName = (exception == null) ? + methodName : + exception.getClass().getSimpleName(); + server.rpcMetrics.addRpcQueueTime(qTime); + server.rpcMetrics.addRpcProcessingTime(processingTime); + server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName, + processingTime); } return new RpcResponseWrapper(result); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 0f11c97c9eb..24dd0c21b82 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -355,8 +355,8 @@ public abstract class Server { private int readThreads; // number of read threads private int readerPendingConnectionQueue; // number of connections to queue per read thread private Class rpcRequestClass; // class used for deserializing the rpc request - protected RpcMetrics rpcMetrics; - protected RpcDetailedMetrics rpcDetailedMetrics; + final protected RpcMetrics rpcMetrics; + final protected RpcDetailedMetrics rpcDetailedMetrics; private Configuration conf; private String portRangeConfig = null; @@ -2494,12 +2494,8 @@ public abstract class Server { listener.doStop(); responder.interrupt(); notifyAll(); - if (this.rpcMetrics != null) { - this.rpcMetrics.shutdown(); - } - if (this.rpcDetailedMetrics != null) { - this.rpcDetailedMetrics.shutdown(); - } + this.rpcMetrics.shutdown(); + this.rpcDetailedMetrics.shutdown(); } /** Wait for the server to be stopped. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 34823b34d1f..04ab4dc2699 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -471,37 +471,29 @@ public class WritableRpcEngine implements RpcEngine { // Invoke the protocol method + long startTime = Time.now(); + int qTime = (int) (startTime-receivedTime); + Exception exception = null; try { - long startTime = Time.now(); - Method method = + Method method = protocolImpl.protocolClass.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true); server.rpcDetailedMetrics.init(protocolImpl.protocolClass); Object value = method.invoke(protocolImpl.protocolImpl, call.getParameters()); - int processingTime = (int) (Time.now() - startTime); - int qTime = (int) (startTime-receivedTime); - if (LOG.isDebugEnabled()) { - LOG.debug("Served: " + call.getMethodName() + - " queueTime= " + qTime + - " procesingTime= " + processingTime); - } - server.rpcMetrics.addRpcQueueTime(qTime); - server.rpcMetrics.addRpcProcessingTime(processingTime); - server.rpcDetailedMetrics.addProcessingTime(call.getMethodName(), - processingTime); if (server.verbose) log("Return: "+value); - return new ObjectWritable(method.getReturnType(), value); } catch (InvocationTargetException e) { Throwable target = e.getTargetException(); if (target instanceof IOException) { + exception = (IOException)target; throw (IOException)target; } else { IOException ioe = new IOException(target.toString()); ioe.setStackTrace(target.getStackTrace()); + exception = ioe; throw ioe; } } catch (Throwable e) { @@ -510,8 +502,27 @@ public class WritableRpcEngine implements RpcEngine { } IOException ioe = new IOException(e.toString()); ioe.setStackTrace(e.getStackTrace()); + exception = ioe; throw ioe; - } + } finally { + int processingTime = (int) (Time.now() - startTime); + if (LOG.isDebugEnabled()) { + String msg = "Served: " + call.getMethodName() + + " queueTime= " + qTime + + " procesingTime= " + processingTime; + if (exception != null) { + msg += " exception= " + exception.getClass().getSimpleName(); + } + LOG.debug(msg); + } + String detailedMetricsName = (exception == null) ? + call.getMethodName() : + exception.getClass().getSimpleName(); + server.rpcMetrics.addRpcQueueTime(qTime); + server.rpcMetrics.addRpcProcessingTime(processingTime); + server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName, + processingTime); + } } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 45499f5c98f..dfbc91c43a6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -496,6 +496,8 @@ public class TestRPC { caught = true; } assertTrue(caught); + rb = getMetrics(server.rpcDetailedMetrics.name()); + assertCounter("IOExceptionNumOps", 1L, rb); proxy.testServerGet();