HADOOP-12903. IPC Server should allow suppressing exception logging by type, not log 'server too busy' messages. (Arpit Agarwal)
This commit is contained in:
parent
87c8005ad3
commit
2e040d31c7
|
@ -142,8 +142,22 @@ public abstract class Server {
|
||||||
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
|
private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
|
||||||
private Tracer tracer;
|
private Tracer tracer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add exception classes for which server won't log stack traces.
|
||||||
|
*
|
||||||
|
* @param exceptionClass exception classes
|
||||||
|
*/
|
||||||
public void addTerseExceptions(Class<?>... exceptionClass) {
|
public void addTerseExceptions(Class<?>... exceptionClass) {
|
||||||
exceptionsHandler.addTerseExceptions(exceptionClass);
|
exceptionsHandler.addTerseLoggingExceptions(exceptionClass);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add exception classes which server won't log at all.
|
||||||
|
*
|
||||||
|
* @param exceptionClass exception classes
|
||||||
|
*/
|
||||||
|
public void addSuppressedLoggingExceptions(Class<?>... exceptionClass) {
|
||||||
|
exceptionsHandler.addSuppressedLoggingExceptions(exceptionClass);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -151,29 +165,54 @@ public abstract class Server {
|
||||||
* e.g., terse exception group for concise logging messages
|
* e.g., terse exception group for concise logging messages
|
||||||
*/
|
*/
|
||||||
static class ExceptionsHandler {
|
static class ExceptionsHandler {
|
||||||
private volatile Set<String> terseExceptions = new HashSet<String>();
|
private volatile Set<String> terseExceptions = new HashSet<>();
|
||||||
|
private volatile Set<String> suppressedExceptions = new HashSet<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add exception class so server won't log its stack trace.
|
* Add exception classes for which server won't log stack traces.
|
||||||
* Modifying the terseException through this method is thread safe.
|
* Optimized for infrequent invocation.
|
||||||
*
|
|
||||||
* @param exceptionClass exception classes
|
* @param exceptionClass exception classes
|
||||||
*/
|
*/
|
||||||
void addTerseExceptions(Class<?>... exceptionClass) {
|
void addTerseLoggingExceptions(Class<?>... exceptionClass) {
|
||||||
|
// Thread-safe replacement of terseExceptions.
|
||||||
|
terseExceptions = addExceptions(terseExceptions, exceptionClass);
|
||||||
|
}
|
||||||
|
|
||||||
// Make a copy of terseException for performing modification
|
/**
|
||||||
final HashSet<String> newSet = new HashSet<String>(terseExceptions);
|
* Add exception classes which server won't log at all.
|
||||||
|
* Optimized for infrequent invocation.
|
||||||
|
* @param exceptionClass exception classes
|
||||||
|
*/
|
||||||
|
void addSuppressedLoggingExceptions(Class<?>... exceptionClass) {
|
||||||
|
// Thread-safe replacement of suppressedExceptions.
|
||||||
|
suppressedExceptions = addExceptions(
|
||||||
|
suppressedExceptions, exceptionClass);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isTerseLog(Class<?> t) {
|
||||||
|
return terseExceptions.contains(t.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isSuppressedLog(Class<?> t) {
|
||||||
|
return suppressedExceptions.contains(t.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a new set containing all the exceptions in exceptionsSet
|
||||||
|
* and exceptionClass.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private static Set<String> addExceptions(
|
||||||
|
final Set<String> exceptionsSet, Class<?>[] exceptionClass) {
|
||||||
|
// Make a copy of the exceptionSet for performing modification
|
||||||
|
final HashSet<String> newSet = new HashSet<>(exceptionsSet);
|
||||||
|
|
||||||
// Add all class names into the HashSet
|
// Add all class names into the HashSet
|
||||||
for (Class<?> name : exceptionClass) {
|
for (Class<?> name : exceptionClass) {
|
||||||
newSet.add(name.toString());
|
newSet.add(name.toString());
|
||||||
}
|
}
|
||||||
// Replace terseException set
|
|
||||||
terseExceptions = Collections.unmodifiableSet(newSet);
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean isTerse(Class<?> t) {
|
return Collections.unmodifiableSet(newSet);
|
||||||
return terseExceptions.contains(t.toString());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -881,7 +920,7 @@ public abstract class Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
void doRead(SelectionKey key) throws InterruptedException {
|
void doRead(SelectionKey key) throws InterruptedException {
|
||||||
int count = 0;
|
int count;
|
||||||
Connection c = (Connection)key.attachment();
|
Connection c = (Connection)key.attachment();
|
||||||
if (c == null) {
|
if (c == null) {
|
||||||
return;
|
return;
|
||||||
|
@ -894,13 +933,17 @@ public abstract class Server {
|
||||||
LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
|
LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
|
||||||
throw ieo;
|
throw ieo;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// a WrappedRpcServerException is an exception that has been sent
|
// Do not log WrappedRpcServerExceptionSuppressed.
|
||||||
// to the client, so the stacktrace is unnecessary; any other
|
if (!(e instanceof WrappedRpcServerExceptionSuppressed)) {
|
||||||
// exceptions are unexpected internal server errors and thus the
|
// A WrappedRpcServerException is an exception that has been sent
|
||||||
// stacktrace should be logged
|
// to the client, so the stacktrace is unnecessary; any other
|
||||||
LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " +
|
// exceptions are unexpected internal server errors and thus the
|
||||||
c.getHostAddress() + " threw exception [" + e + "]",
|
// stacktrace should be logged.
|
||||||
(e instanceof WrappedRpcServerException) ? null : e);
|
LOG.info(Thread.currentThread().getName() +
|
||||||
|
": readAndProcess from client " + c.getHostAddress() +
|
||||||
|
" threw exception [" + e + "]",
|
||||||
|
(e instanceof WrappedRpcServerException) ? null : e);
|
||||||
|
}
|
||||||
count = -1; //so that the (count < 0) block is executed
|
count = -1; //so that the (count < 0) block is executed
|
||||||
}
|
}
|
||||||
if (count < 0) {
|
if (count < 0) {
|
||||||
|
@ -1243,6 +1286,18 @@ public abstract class Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A WrappedRpcServerException that is suppressed altogether
|
||||||
|
* for the purposes of logging.
|
||||||
|
*/
|
||||||
|
private static class WrappedRpcServerExceptionSuppressed
|
||||||
|
extends WrappedRpcServerException {
|
||||||
|
public WrappedRpcServerExceptionSuppressed(
|
||||||
|
RpcErrorCodeProto errCode, IOException ioe) {
|
||||||
|
super(errCode, ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Reads calls from a connection and queues them for handling. */
|
/** Reads calls from a connection and queues them for handling. */
|
||||||
public class Connection {
|
public class Connection {
|
||||||
private boolean connectionHeaderRead = false; // connection header is read?
|
private boolean connectionHeaderRead = false; // connection header is read?
|
||||||
|
@ -2117,7 +2172,7 @@ public abstract class Server {
|
||||||
rpcMetrics.incrClientBackoff();
|
rpcMetrics.incrClientBackoff();
|
||||||
RetriableException retriableException =
|
RetriableException retriableException =
|
||||||
new RetriableException("Server is too busy.");
|
new RetriableException("Server is too busy.");
|
||||||
throw new WrappedRpcServerException(
|
throw new WrappedRpcServerExceptionSuppressed(
|
||||||
RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
|
RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2313,18 +2368,7 @@ public abstract class Server {
|
||||||
if (e instanceof UndeclaredThrowableException) {
|
if (e instanceof UndeclaredThrowableException) {
|
||||||
e = e.getCause();
|
e = e.getCause();
|
||||||
}
|
}
|
||||||
String logMsg = Thread.currentThread().getName() + ", call " + call;
|
logException(LOG, e, call);
|
||||||
if (exceptionsHandler.isTerse(e.getClass())) {
|
|
||||||
// Don't log the whole stack trace. Way too noisy!
|
|
||||||
LOG.info(logMsg + ": " + e);
|
|
||||||
} else if (e instanceof RuntimeException || e instanceof Error) {
|
|
||||||
// These exception types indicate something is probably wrong
|
|
||||||
// on the server side, as opposed to just a normal exceptional
|
|
||||||
// result.
|
|
||||||
LOG.warn(logMsg, e);
|
|
||||||
} else {
|
|
||||||
LOG.info(logMsg, e);
|
|
||||||
}
|
|
||||||
if (e instanceof RpcServerException) {
|
if (e instanceof RpcServerException) {
|
||||||
RpcServerException rse = ((RpcServerException)e);
|
RpcServerException rse = ((RpcServerException)e);
|
||||||
returnStatus = rse.getRpcStatusProto();
|
returnStatus = rse.getRpcStatusProto();
|
||||||
|
@ -2377,6 +2421,26 @@ public abstract class Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void logException(Log logger, Throwable e, Call call) {
|
||||||
|
if (exceptionsHandler.isSuppressedLog(e.getClass())) {
|
||||||
|
return; // Log nothing.
|
||||||
|
}
|
||||||
|
|
||||||
|
final String logMsg = Thread.currentThread().getName() + ", call " + call;
|
||||||
|
if (exceptionsHandler.isTerseLog(e.getClass())) {
|
||||||
|
// Don't log the whole stack trace. Way too noisy!
|
||||||
|
logger.info(logMsg + ": " + e);
|
||||||
|
} else if (e instanceof RuntimeException || e instanceof Error) {
|
||||||
|
// These exception types indicate something is probably wrong
|
||||||
|
// on the server side, as opposed to just a normal exceptional
|
||||||
|
// result.
|
||||||
|
logger.warn(logMsg, e);
|
||||||
|
} else {
|
||||||
|
logger.info(logMsg, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected Server(String bindAddress, int port,
|
protected Server(String bindAddress, int port,
|
||||||
Class<? extends Writable> paramClass, int handlerCount,
|
Class<? extends Writable> paramClass, int handlerCount,
|
||||||
|
@ -2482,7 +2546,7 @@ public abstract class Server {
|
||||||
saslPropsResolver = SaslPropertiesResolver.getInstance(conf);
|
saslPropsResolver = SaslPropertiesResolver.getInstance(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.exceptionsHandler.addTerseExceptions(StandbyException.class);
|
this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)
|
private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)
|
||||||
|
|
|
@ -19,13 +19,19 @@
|
||||||
package org.apache.hadoop.ipc;
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
import static org.mockito.Matchers.*;
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.BindException;
|
import java.net.BindException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.ServerSocket;
|
import java.net.ServerSocket;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.ipc.Server.Call;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -117,15 +123,66 @@ public class TestServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
static class TestException1 extends Exception {
|
||||||
public void testExceptionsHandler() {
|
}
|
||||||
Server.ExceptionsHandler handler = new Server.ExceptionsHandler();
|
|
||||||
handler.addTerseExceptions(IOException.class);
|
|
||||||
handler.addTerseExceptions(RpcServerException.class, IpcException.class);
|
|
||||||
|
|
||||||
assertTrue(handler.isTerse(IOException.class));
|
static class TestException2 extends Exception {
|
||||||
assertTrue(handler.isTerse(RpcServerException.class));
|
}
|
||||||
assertTrue(handler.isTerse(IpcException.class));
|
|
||||||
assertFalse(handler.isTerse(RpcClientException.class));
|
static class TestException3 extends Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout=300000)
|
||||||
|
public void testLogExceptions() throws Exception {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
final Call dummyCall = new Call(0, 0, null, null);
|
||||||
|
Log logger = mock(Log.class);
|
||||||
|
Server server = new Server("0.0.0.0", 0, LongWritable.class, 1, conf) {
|
||||||
|
@Override
|
||||||
|
public Writable call(
|
||||||
|
RPC.RpcKind rpcKind, String protocol, Writable param,
|
||||||
|
long receiveTime) throws Exception {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
server.addSuppressedLoggingExceptions(TestException1.class);
|
||||||
|
server.addTerseExceptions(TestException2.class);
|
||||||
|
|
||||||
|
// Nothing should be logged for a suppressed exception.
|
||||||
|
server.logException(logger, new TestException1(), dummyCall);
|
||||||
|
verifyZeroInteractions(logger);
|
||||||
|
|
||||||
|
// No stack trace should be logged for a terse exception.
|
||||||
|
server.logException(logger, new TestException2(), dummyCall);
|
||||||
|
verify(logger, times(1)).info(anyObject());
|
||||||
|
|
||||||
|
// Full stack trace should be logged for other exceptions.
|
||||||
|
final Throwable te3 = new TestException3();
|
||||||
|
server.logException(logger, te3, dummyCall);
|
||||||
|
verify(logger, times(1)).info(anyObject(), eq(te3));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExceptionsHandlerTerse() {
|
||||||
|
Server.ExceptionsHandler handler = new Server.ExceptionsHandler();
|
||||||
|
handler.addTerseLoggingExceptions(IOException.class);
|
||||||
|
handler.addTerseLoggingExceptions(RpcServerException.class, IpcException.class);
|
||||||
|
|
||||||
|
assertTrue(handler.isTerseLog(IOException.class));
|
||||||
|
assertTrue(handler.isTerseLog(RpcServerException.class));
|
||||||
|
assertTrue(handler.isTerseLog(IpcException.class));
|
||||||
|
assertFalse(handler.isTerseLog(RpcClientException.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExceptionsHandlerSuppressed() {
|
||||||
|
Server.ExceptionsHandler handler = new Server.ExceptionsHandler();
|
||||||
|
handler.addSuppressedLoggingExceptions(IOException.class);
|
||||||
|
handler.addSuppressedLoggingExceptions(RpcServerException.class, IpcException.class);
|
||||||
|
|
||||||
|
assertTrue(handler.isSuppressedLog(IOException.class));
|
||||||
|
assertTrue(handler.isSuppressedLog(RpcServerException.class));
|
||||||
|
assertTrue(handler.isSuppressedLog(IpcException.class));
|
||||||
|
assertFalse(handler.isSuppressedLog(RpcClientException.class));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue