Die with dignity on the network layer
When a fatal error is thrown on the network layer, such an error never makes its way to the uncaught exception handler. This prevents the node from being torn down if an out of memory error or other fatal error is thrown while handling HTTP or transport traffic. This commit adds logic to ensure that such errors bubble their way up to the uncaught exception handler, even though Netty tries really hard to swallow everything. Relates #21720
This commit is contained in:
parent
c79371fd5b
commit
446037ccb8
|
@ -27,6 +27,7 @@ import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
|||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
|
||||
import org.elasticsearch.transport.netty4.Netty4Utils;
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object> {
|
||||
|
@ -72,6 +73,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object> {
|
|||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
Netty4Utils.maybeDie(cause);
|
||||
serverTransport.exceptionCaught(ctx, cause);
|
||||
}
|
||||
|
||||
|
|
|
@ -53,9 +53,9 @@ import org.elasticsearch.common.settings.Setting;
|
|||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.transport.NetworkExceptionHelper;
|
||||
import org.elasticsearch.common.transport.PortsRange;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
|
@ -578,6 +578,12 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
|||
ch.pipeline().addLast("handler", requestHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
Netty4Utils.maybeDie(cause);
|
||||
super.exceptionCaught(ctx, cause);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -80,6 +80,7 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
|
|||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
Netty4Utils.maybeDie(cause);
|
||||
transport.exceptionCaught(ctx, cause);
|
||||
}
|
||||
|
||||
|
|
|
@ -508,6 +508,12 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
|||
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
Netty4Utils.maybeDie(cause);
|
||||
super.exceptionCaught(ctx, cause);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected class ServerChannelInitializer extends ChannelInitializer<Channel> {
|
||||
|
@ -526,6 +532,13 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
|||
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
|
||||
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
Netty4Utils.maybeDie(cause);
|
||||
super.exceptionCaught(ctx, cause);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty4;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
@ -28,9 +29,13 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -63,8 +68,7 @@ public class Netty4Utils {
|
|||
return ((ByteBufBytesReference) reference).toByteBuf();
|
||||
} else {
|
||||
final BytesRefIterator iterator = reference.iterator();
|
||||
// usually we have one, two, or three components
|
||||
// from the header, the message, and a buffer
|
||||
// usually we have one, two, or three components from the header, the message, and a buffer
|
||||
final List<ByteBuf> buffers = new ArrayList<>(3);
|
||||
try {
|
||||
BytesRef slice;
|
||||
|
@ -118,4 +122,31 @@ public class Netty4Utils {
|
|||
}
|
||||
}
|
||||
|
||||
public static void maybeDie(final Throwable cause) throws IOException {
|
||||
if (cause instanceof Error) {
|
||||
/*
|
||||
* Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, Netty wraps too many
|
||||
* invocations of user-code in try/catch blocks that swallow all throwables. This means that a rethrow here will not bubble up
|
||||
* to where we want it to. So, we fork a thread and throw the exception from there where Netty can not get to it. We do not wrap
|
||||
* the exception so as to not lose the original cause during exit, so we give the thread a name based on the previous stack
|
||||
* frame so that at least we know where it came from (in case logging the current stack trace fails).
|
||||
*/
|
||||
try (
|
||||
final StringWriter sw = new StringWriter();
|
||||
final PrintWriter pw = new PrintWriter(sw)) {
|
||||
// try to log the current stack trace
|
||||
Arrays.stream(Thread.currentThread().getStackTrace()).skip(1).map(e -> "\tat " + e).forEach(pw::println);
|
||||
ESLoggerFactory.getLogger(Netty4Utils.class).error("fatal error on the network layer\n{}", sw.toString());
|
||||
} finally {
|
||||
final StackTraceElement previous = Thread.currentThread().getStackTrace()[2];
|
||||
new Thread(
|
||||
() -> {
|
||||
throw (Error) cause;
|
||||
},
|
||||
previous.getClassName() + "#" + previous.getMethodName())
|
||||
.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty4;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
|
|
Loading…
Reference in New Issue