Refactor Netty4Utils#maybeDie (#33021)

In our Netty layer we have had to take extra precautions against Netty
catching throwables which prevents them from reaching the uncaught
exception handler. This code has taken on additional uses in NIO layer
and now in the scheduler engine because there are other components in
stack traces that could catch throwables and suppress them from reaching
the uncaught exception handler. This commit is a simple cleanup of the
iterative evolution of this code to refactor all uses into a single
method in ExceptionsHelper.
This commit is contained in:
Jason Tedor 2018-08-22 10:18:07 -04:00 committed by GitHub
parent ead198bf2e
commit 67bfb765ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 76 additions and 106 deletions

View File

@ -21,11 +21,11 @@ package org.elasticsearch.http.netty4;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.net.InetSocketAddress;
@ -42,7 +42,7 @@ public class Netty4HttpChannel implements HttpChannel {
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
@ -59,7 +59,7 @@ public class Netty4HttpChannel implements HttpChannel {
listener.onResponse(null);
} else {
final Throwable cause = f.cause();
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
if (cause instanceof Error) {
listener.onFailure(new Exception(cause));
} else {

View File

@ -27,7 +27,6 @@ import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.transport.netty4.Netty4Utils;
@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
@ -58,7 +57,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.dieOnError(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
@ -74,7 +73,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
if (cause instanceof Error) {
serverTransport.onException(channel, new Exception(cause));

View File

@ -20,10 +20,10 @@
package org.elasticsearch.http.netty4;
import io.netty.channel.Channel;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.net.InetSocketAddress;
@ -40,7 +40,7 @@ public class Netty4HttpServerChannel implements HttpServerChannel {
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);

View File

@ -41,6 +41,7 @@ import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AttributeKey;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
@ -338,7 +339,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}
}
@ -354,7 +355,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
Netty4HttpServerChannel httpServerChannel = ctx.channel().attr(HTTP_SERVER_CHANNEL_KEY).get();
if (cause instanceof Error) {
transport.onServerException(httpServerChannel, new Exception(cause));

View File

@ -68,7 +68,7 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
final Throwable newCause = unwrapped != null ? unwrapped : cause;
Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport.netty4;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.concurrent.CompletableContext;
@ -45,7 +46,7 @@ public class Netty4TcpChannel implements TcpChannel {
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
@ -97,7 +98,7 @@ public class Netty4TcpChannel implements TcpChannel {
listener.onResponse(null);
} else {
final Throwable cause = f.cause();
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
if (cause instanceof Error) {
listener.onFailure(new Exception(cause));
} else {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.transport.netty4;
import io.netty.channel.Channel;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.transport.TcpServerChannel;
@ -41,7 +42,7 @@ public class Netty4TcpServerChannel implements TcpServerChannel {
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);

View File

@ -38,6 +38,7 @@ import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
@ -228,7 +229,7 @@ public class Netty4Transport extends TcpTransport {
ChannelFuture channelFuture = bootstrap.connect(address);
Channel channel = channelFuture.channel();
if (channel == null) {
Netty4Utils.maybeDie(channelFuture.cause());
ExceptionsHelper.maybeDieOnAnotherThread(channelFuture.cause());
throw new IOException(channelFuture.cause());
}
addClosedExceptionLogger(channel);
@ -242,7 +243,7 @@ public class Netty4Transport extends TcpTransport {
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
listener.onFailure(new Exception(cause));
} else {
listener.onFailure((Exception) cause);
@ -307,7 +308,7 @@ public class Netty4Transport extends TcpTransport {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}
}
@ -333,7 +334,7 @@ public class Netty4Transport extends TcpTransport {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}
}
@ -351,7 +352,7 @@ public class Netty4Transport extends TcpTransport {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Netty4Utils.maybeDie(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get();
if (cause instanceof Error) {
onServerException(serverChannel, new Exception(cause));

View File

@ -27,20 +27,16 @@ import io.netty.channel.ChannelFuture;
import io.netty.util.NettyRuntime;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
public class Netty4Utils {
@ -161,34 +157,4 @@ public class Netty4Utils {
}
}
/**
* If the specified cause is an unrecoverable error, this method will rethrow the cause on a separate thread so that it can not be
* caught and bubbles up to the uncaught exception handler.
*
* @param cause the throwable to test
*/
public static void maybeDie(final Throwable cause) {
final Logger logger = ESLoggerFactory.getLogger(Netty4Utils.class);
final Optional<Error> maybeError = ExceptionsHelper.maybeError(cause, logger);
if (maybeError.isPresent()) {
/*
* Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, Netty wraps too many
* invocations of user-code in try/catch blocks that swallow all throwables. This means that a rethrow here will not bubble up
* to where we want it to. So, we fork a thread and throw the exception from there where Netty can not get to it. We do not wrap
* the exception so as to not lose the original cause during exit.
*/
try {
// try to log the current stack trace
final String formatted = ExceptionsHelper.formatStackTrace(Thread.currentThread().getStackTrace());
logger.error("fatal error on the network layer\n{}", formatted);
} finally {
new Thread(
() -> {
throw maybeError.get();
})
.start();
}
}
}
}

View File

@ -139,7 +139,7 @@ public class HttpReadWriteHandler implements ReadWriteHandler {
if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.dieOnError(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
} else {
transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);

View File

@ -73,7 +73,7 @@ public class NettyAdaptor implements AutoCloseable {
closeFuture.await();
if (closeFuture.isSuccess() == false) {
Throwable cause = closeFuture.cause();
ExceptionsHelper.dieOnError(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
throw (Exception) cause;
}
}
@ -84,7 +84,7 @@ public class NettyAdaptor implements AutoCloseable {
listener.accept(null, null);
} else {
final Throwable cause = f.cause();
ExceptionsHelper.dieOnError(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
assert cause instanceof Exception;
listener.accept(null, (Exception) cause);
}

View File

@ -223,7 +223,7 @@ public class NettyListener implements BiConsumer<Void, Exception>, ChannelPromis
biConsumer.accept(null, null);
} else {
if (cause instanceof Error) {
ExceptionsHelper.dieOnError(cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
biConsumer.accept(null, new Exception(cause));
} else {
biConsumer.accept(null, (Exception) cause);

View File

@ -90,14 +90,14 @@ public class DieWithDignityIT extends ESRestTestCase {
final Iterator<String> it = lines.iterator();
boolean fatalErrorOnTheNetworkLayer = false;
boolean fatalError = false;
boolean fatalErrorInThreadExiting = false;
while (it.hasNext() && (fatalErrorOnTheNetworkLayer == false || fatalErrorInThreadExiting == false)) {
while (it.hasNext() && (fatalError == false || fatalErrorInThreadExiting == false)) {
final String line = it.next();
if (line.contains("fatal error on the network layer")) {
fatalErrorOnTheNetworkLayer = true;
} else if (line.matches(".*\\[ERROR\\]\\[o.e.b.ElasticsearchUncaughtExceptionHandler\\] \\[node-0\\]"
if (line.matches(".*\\[ERROR\\]\\[o\\.e\\.ExceptionsHelper\\s*\\] \\[node-0\\] fatal error")) {
fatalError = true;
} else if (line.matches(".*\\[ERROR\\]\\[o\\.e\\.b\\.ElasticsearchUncaughtExceptionHandler\\] \\[node-0\\]"
+ " fatal error in thread \\[Thread-\\d+\\], exiting$")) {
fatalErrorInThreadExiting = true;
assertTrue(it.hasNext());
@ -105,7 +105,7 @@ public class DieWithDignityIT extends ESRestTestCase {
}
}
assertTrue(fatalErrorOnTheNetworkLayer);
assertTrue(fatalError);
assertTrue(fatalErrorInThreadExiting);
}

View File

@ -136,42 +136,6 @@ public final class ExceptionsHelper {
return Arrays.stream(stackTrace).skip(1).map(e -> "\tat " + e).collect(Collectors.joining("\n"));
}
static final int MAX_ITERATIONS = 1024;
/**
* Unwrap the specified throwable looking for any suppressed errors or errors as a root cause of the specified throwable.
*
* @param cause the root throwable
*
* @return an optional error if one is found suppressed or a root cause in the tree rooted at the specified throwable
*/
public static Optional<Error> maybeError(final Throwable cause, final Logger logger) {
// early terminate if the cause is already an error
if (cause instanceof Error) {
return Optional.of((Error) cause);
}
final Queue<Throwable> queue = new LinkedList<>();
queue.add(cause);
int iterations = 0;
while (!queue.isEmpty()) {
iterations++;
if (iterations > MAX_ITERATIONS) {
logger.warn("giving up looking for fatal errors", cause);
break;
}
final Throwable current = queue.remove();
if (current instanceof Error) {
return Optional.of((Error) current);
}
Collections.addAll(queue, current.getSuppressed());
if (current.getCause() != null) {
queue.add(current.getCause());
}
}
return Optional.empty();
}
/**
* Rethrows the first exception in the list and adds all remaining to the suppressed list.
* If the given list is empty no exception is thrown
@ -243,13 +207,50 @@ public final class ExceptionsHelper {
return true;
}
static final int MAX_ITERATIONS = 1024;
/**
* Unwrap the specified throwable looking for any suppressed errors or errors as a root cause of the specified throwable.
*
* @param cause the root throwable
* @return an optional error if one is found suppressed or a root cause in the tree rooted at the specified throwable
*/
public static Optional<Error> maybeError(final Throwable cause, final Logger logger) {
// early terminate if the cause is already an error
if (cause instanceof Error) {
return Optional.of((Error) cause);
}
final Queue<Throwable> queue = new LinkedList<>();
queue.add(cause);
int iterations = 0;
while (queue.isEmpty() == false) {
iterations++;
// this is a guard against deeply nested or circular chains of exceptions
if (iterations > MAX_ITERATIONS) {
logger.warn("giving up looking for fatal errors", cause);
break;
}
final Throwable current = queue.remove();
if (current instanceof Error) {
return Optional.of((Error) current);
}
Collections.addAll(queue, current.getSuppressed());
if (current.getCause() != null) {
queue.add(current.getCause());
}
}
return Optional.empty();
}
/**
* If the specified cause is an unrecoverable error, this method will rethrow the cause on a separate thread so that it can not be
* caught and bubbles up to the uncaught exception handler.
* caught and bubbles up to the uncaught exception handler. Note that the cause tree is examined for any {@link Error}. See
* {@link #maybeError(Throwable, Logger)} for the semantics.
*
* @param throwable the throwable to test
* @param throwable the throwable to possibly throw on another thread
*/
public static void dieOnError(Throwable throwable) {
public static void maybeDieOnAnotherThread(final Throwable throwable) {
ExceptionsHelper.maybeError(throwable, logger).ifPresent(error -> {
/*
* Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, sometimes the stack

View File

@ -194,11 +194,11 @@ public class SchedulerEngine {
/*
* Allowing the throwable to escape here will lead to be it being caught in FutureTask#run and set as the outcome of this
* task; however, we never inspect the the outcomes of these scheduled tasks and so allowing the throwable to escape
* unhandled here could lead to us losing fatal errors. Instead, we rely on ExceptionsHelper#dieOnError to appropriately
* dispatch any error to the uncaught exception handler. We should never see an exception here as these do not escape from
* SchedulerEngine#notifyListeners.
* unhandled here could lead to us losing fatal errors. Instead, we rely on ExceptionsHelper#maybeThrowErrorOnAnotherThread
* to appropriately dispatch any error to the uncaught exception handler. We should never see an exception here as these do
* not escape from SchedulerEngine#notifyListeners.
*/
ExceptionsHelper.dieOnError(t);
ExceptionsHelper.maybeDieOnAnotherThread(t);
throw t;
}
scheduleNextRun(triggeredTime);