From 0f15eb6a4032e090fa031c1687c138f55784d8c2 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Tue, 8 Mar 2016 23:29:43 -0800 Subject: [PATCH] HADOOP-12903. IPC Server should allow suppressing exception logging by type, not log 'server too busy' messages. (Arpit Agarwal) --- .../java/org/apache/hadoop/ipc/Server.java | 134 +++++++++++++----- .../org/apache/hadoop/ipc/TestServer.java | 75 ++++++++-- 2 files changed, 165 insertions(+), 44 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 20c993b8e5c..8ae2ef9ef43 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -139,8 +139,22 @@ public abstract class Server { private ExceptionsHandler exceptionsHandler = new ExceptionsHandler(); private Tracer tracer; + /** + * Add exception classes for which server won't log stack traces. + * + * @param exceptionClass exception classes + */ public void addTerseExceptions(Class... exceptionClass) { - exceptionsHandler.addTerseExceptions(exceptionClass); + exceptionsHandler.addTerseLoggingExceptions(exceptionClass); + } + + /** + * Add exception classes which server won't log at all. + * + * @param exceptionClass exception classes + */ + public void addSuppressedLoggingExceptions(Class... exceptionClass) { + exceptionsHandler.addSuppressedLoggingExceptions(exceptionClass); } /** @@ -148,29 +162,54 @@ public void addTerseExceptions(Class... exceptionClass) { * e.g., terse exception group for concise logging messages */ static class ExceptionsHandler { - private volatile Set terseExceptions = new HashSet(); + private volatile Set terseExceptions = new HashSet<>(); + private volatile Set suppressedExceptions = new HashSet<>(); /** - * Add exception class so server won't log its stack trace. - * Modifying the terseException through this method is thread safe. - * + * Add exception classes for which server won't log stack traces. + * Optimized for infrequent invocation. * @param exceptionClass exception classes */ - void addTerseExceptions(Class... exceptionClass) { + void addTerseLoggingExceptions(Class... exceptionClass) { + // Thread-safe replacement of terseExceptions. + terseExceptions = addExceptions(terseExceptions, exceptionClass); + } - // Make a copy of terseException for performing modification - final HashSet newSet = new HashSet(terseExceptions); + /** + * Add exception classes which server won't log at all. + * Optimized for infrequent invocation. + * @param exceptionClass exception classes + */ + void addSuppressedLoggingExceptions(Class... exceptionClass) { + // Thread-safe replacement of suppressedExceptions. + suppressedExceptions = addExceptions( + suppressedExceptions, exceptionClass); + } + + boolean isTerseLog(Class t) { + return terseExceptions.contains(t.toString()); + } + + boolean isSuppressedLog(Class t) { + return suppressedExceptions.contains(t.toString()); + } + + /** + * Return a new set containing all the exceptions in exceptionsSet + * and exceptionClass. + * @return + */ + private static Set addExceptions( + final Set exceptionsSet, Class[] exceptionClass) { + // Make a copy of the exceptionSet for performing modification + final HashSet newSet = new HashSet<>(exceptionsSet); // Add all class names into the HashSet for (Class name : exceptionClass) { newSet.add(name.toString()); } - // Replace terseException set - terseExceptions = Collections.unmodifiableSet(newSet); - } - boolean isTerse(Class t) { - return terseExceptions.contains(t.toString()); + return Collections.unmodifiableSet(newSet); } } @@ -878,7 +917,7 @@ void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOf } void doRead(SelectionKey key) throws InterruptedException { - int count = 0; + int count; Connection c = (Connection)key.attachment(); if (c == null) { return; @@ -891,13 +930,17 @@ void doRead(SelectionKey key) throws InterruptedException { LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { - // a WrappedRpcServerException is an exception that has been sent - // to the client, so the stacktrace is unnecessary; any other - // exceptions are unexpected internal server errors and thus the - // stacktrace should be logged - LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " + - c.getHostAddress() + " threw exception [" + e + "]", - (e instanceof WrappedRpcServerException) ? null : e); + // Do not log WrappedRpcServerExceptionSuppressed. + if (!(e instanceof WrappedRpcServerExceptionSuppressed)) { + // A WrappedRpcServerException is an exception that has been sent + // to the client, so the stacktrace is unnecessary; any other + // exceptions are unexpected internal server errors and thus the + // stacktrace should be logged. + LOG.info(Thread.currentThread().getName() + + ": readAndProcess from client " + c.getHostAddress() + + " threw exception [" + e + "]", + (e instanceof WrappedRpcServerException) ? null : e); + } count = -1; //so that the (count < 0) block is executed } if (count < 0) { @@ -1240,6 +1283,18 @@ public String toString() { } } + /** + * A WrappedRpcServerException that is suppressed altogether + * for the purposes of logging. + */ + private static class WrappedRpcServerExceptionSuppressed + extends WrappedRpcServerException { + public WrappedRpcServerExceptionSuppressed( + RpcErrorCodeProto errCode, IOException ioe) { + super(errCode, ioe); + } + } + /** Reads calls from a connection and queues them for handling. */ public class Connection { private boolean connectionHeaderRead = false; // connection header is read? @@ -2032,7 +2087,7 @@ private void queueRequestOrAskClientToBackOff(Call call) rpcMetrics.incrClientBackoff(); RetriableException retriableException = new RetriableException("Server is too busy."); - throw new WrappedRpcServerException( + throw new WrappedRpcServerExceptionSuppressed( RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException); } } @@ -2227,18 +2282,7 @@ public Writable run() throws Exception { if (e instanceof UndeclaredThrowableException) { e = e.getCause(); } - String logMsg = Thread.currentThread().getName() + ", call " + call; - if (exceptionsHandler.isTerse(e.getClass())) { - // Don't log the whole stack trace. Way too noisy! - LOG.info(logMsg + ": " + e); - } else if (e instanceof RuntimeException || e instanceof Error) { - // These exception types indicate something is probably wrong - // on the server side, as opposed to just a normal exceptional - // result. - LOG.warn(logMsg, e); - } else { - LOG.info(logMsg, e); - } + logException(LOG, e, call); if (e instanceof RpcServerException) { RpcServerException rse = ((RpcServerException)e); returnStatus = rse.getRpcStatusProto(); @@ -2291,6 +2335,26 @@ public Writable run() throws Exception { } } + + @VisibleForTesting + void logException(Log logger, Throwable e, Call call) { + if (exceptionsHandler.isSuppressedLog(e.getClass())) { + return; // Log nothing. + } + + final String logMsg = Thread.currentThread().getName() + ", call " + call; + if (exceptionsHandler.isTerseLog(e.getClass())) { + // Don't log the whole stack trace. Way too noisy! + logger.info(logMsg + ": " + e); + } else if (e instanceof RuntimeException || e instanceof Error) { + // These exception types indicate something is probably wrong + // on the server side, as opposed to just a normal exceptional + // result. + logger.warn(logMsg, e); + } else { + logger.info(logMsg, e); + } + } protected Server(String bindAddress, int port, Class paramClass, int handlerCount, @@ -2396,7 +2460,7 @@ protected Server(String bindAddress, int port, saslPropsResolver = SaslPropertiesResolver.getInstance(conf); } - this.exceptionsHandler.addTerseExceptions(StandbyException.class); + this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class); } private RpcSaslProto buildNegotiateResponse(List authMethods) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java index 64dc4d4ffd7..afda5355da6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java @@ -19,13 +19,19 @@ package org.apache.hadoop.ipc; import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.io.IOException; import java.net.BindException; import java.net.InetSocketAddress; import java.net.ServerSocket; +import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.Server.Call; import org.junit.Test; /** @@ -117,15 +123,66 @@ public void testBindError() throws Exception { } } - @Test - public void testExceptionsHandler() { - Server.ExceptionsHandler handler = new Server.ExceptionsHandler(); - handler.addTerseExceptions(IOException.class); - handler.addTerseExceptions(RpcServerException.class, IpcException.class); + static class TestException1 extends Exception { + } - assertTrue(handler.isTerse(IOException.class)); - assertTrue(handler.isTerse(RpcServerException.class)); - assertTrue(handler.isTerse(IpcException.class)); - assertFalse(handler.isTerse(RpcClientException.class)); + static class TestException2 extends Exception { + } + + static class TestException3 extends Exception { + } + + @Test (timeout=300000) + public void testLogExceptions() throws Exception { + final Configuration conf = new Configuration(); + final Call dummyCall = new Call(0, 0, null, null); + Log logger = mock(Log.class); + Server server = new Server("0.0.0.0", 0, LongWritable.class, 1, conf) { + @Override + public Writable call( + RPC.RpcKind rpcKind, String protocol, Writable param, + long receiveTime) throws Exception { + return null; + } + }; + server.addSuppressedLoggingExceptions(TestException1.class); + server.addTerseExceptions(TestException2.class); + + // Nothing should be logged for a suppressed exception. + server.logException(logger, new TestException1(), dummyCall); + verifyZeroInteractions(logger); + + // No stack trace should be logged for a terse exception. + server.logException(logger, new TestException2(), dummyCall); + verify(logger, times(1)).info(anyObject()); + + // Full stack trace should be logged for other exceptions. + final Throwable te3 = new TestException3(); + server.logException(logger, te3, dummyCall); + verify(logger, times(1)).info(anyObject(), eq(te3)); + } + + @Test + public void testExceptionsHandlerTerse() { + Server.ExceptionsHandler handler = new Server.ExceptionsHandler(); + handler.addTerseLoggingExceptions(IOException.class); + handler.addTerseLoggingExceptions(RpcServerException.class, IpcException.class); + + assertTrue(handler.isTerseLog(IOException.class)); + assertTrue(handler.isTerseLog(RpcServerException.class)); + assertTrue(handler.isTerseLog(IpcException.class)); + assertFalse(handler.isTerseLog(RpcClientException.class)); + } + + @Test + public void testExceptionsHandlerSuppressed() { + Server.ExceptionsHandler handler = new Server.ExceptionsHandler(); + handler.addSuppressedLoggingExceptions(IOException.class); + handler.addSuppressedLoggingExceptions(RpcServerException.class, IpcException.class); + + assertTrue(handler.isSuppressedLog(IOException.class)); + assertTrue(handler.isSuppressedLog(RpcServerException.class)); + assertTrue(handler.isSuppressedLog(IpcException.class)); + assertFalse(handler.isSuppressedLog(RpcClientException.class)); } }