diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java index d71e1ee9376..05295c1d4da 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java @@ -27,6 +27,7 @@ import io.netty.channel.ChannelFuture; import io.netty.util.NettyRuntime; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.common.Booleans; @@ -37,8 +38,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Locale; +import java.util.Optional; +import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -167,7 +172,8 @@ public class Netty4Utils { * @param cause the throwable to test */ public static void maybeDie(final Throwable cause) { - if (cause instanceof Error) { + final Optional maybeError = maybeError(cause); + if (maybeError.isPresent()) { /* * Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, Netty wraps too many * invocations of user-code in try/catch blocks that swallow all throwables. This means that a rethrow here will not bubble up @@ -178,15 +184,52 @@ public class Netty4Utils { // try to log the current stack trace final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); final String formatted = Arrays.stream(stackTrace).skip(1).map(e -> "\tat " + e).collect(Collectors.joining("\n")); - ESLoggerFactory.getLogger(Netty4Utils.class).error("fatal error on the network layer\n{}", formatted); + final Logger logger = ESLoggerFactory.getLogger(Netty4Utils.class); + logger.error("fatal error on the network layer\n{}", formatted); } finally { new Thread( () -> { - throw (Error) cause; + throw maybeError.get(); }) .start(); } } } + static final int MAX_ITERATIONS = 1024; + + /** + * Unwrap the specified throwable looking for any suppressed errors or errors as a root cause of the specified throwable. + * + * @param cause the root throwable + * + * @return an optional error if one is found suppressed or a root cause in the tree rooted at the specified throwable + */ + static Optional maybeError(final Throwable cause) { + // early terminate if the cause is already an error + if (cause instanceof Error) { + return Optional.of((Error) cause); + } + + final Queue queue = new LinkedList<>(); + queue.add(cause); + int iterations = 0; + while (!queue.isEmpty()) { + iterations++; + if (iterations > MAX_ITERATIONS) { + ESLoggerFactory.getLogger(Netty4Utils.class).warn("giving up looking for fatal errors on the network layer", cause); + break; + } + final Throwable current = queue.remove(); + if (current instanceof Error) { + return Optional.of((Error) current); + } + Collections.addAll(queue, current.getSuppressed()); + if (current.getCause() != null) { + queue.add(current.getCause()); + } + } + return Optional.empty(); + } + } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java index 8372a8540b8..43be6f0efdd 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.transport.netty4; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import io.netty.handler.codec.DecoderException; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase; import org.elasticsearch.common.bytes.BytesArray; @@ -32,6 +33,9 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.equalTo; public class Netty4UtilsTests extends ESTestCase { @@ -75,6 +79,60 @@ public class Netty4UtilsTests extends ESTestCase { assertArrayEquals(BytesReference.toBytes(ref), BytesReference.toBytes(bytesReference)); } + public void testMaybeError() { + final Error outOfMemoryError = new OutOfMemoryError(); + assertError(outOfMemoryError, outOfMemoryError); + + final DecoderException decoderException = new DecoderException(outOfMemoryError); + assertError(decoderException, outOfMemoryError); + + final Exception e = new Exception(); + e.addSuppressed(decoderException); + assertError(e, outOfMemoryError); + + final int depth = randomIntBetween(1, 16); + Throwable cause = new Exception(); + boolean fatal = false; + Error error = null; + for (int i = 0; i < depth; i++) { + final int length = randomIntBetween(1, 4); + for (int j = 0; j < length; j++) { + if (!fatal && rarely()) { + error = new Error(); + cause.addSuppressed(error); + fatal = true; + } else { + cause.addSuppressed(new Exception()); + } + } + if (!fatal && rarely()) { + cause = error = new Error(cause); + fatal = true; + } else { + cause = new Exception(cause); + } + } + if (fatal) { + assertError(cause, error); + } else { + assertFalse(Netty4Utils.maybeError(cause).isPresent()); + } + + assertFalse(Netty4Utils.maybeError(new Exception(new DecoderException())).isPresent()); + + Throwable chain = outOfMemoryError; + for (int i = 0; i < Netty4Utils.MAX_ITERATIONS; i++) { + chain = new Exception(chain); + } + assertFalse(Netty4Utils.maybeError(chain).isPresent()); + } + + private void assertError(final Throwable cause, final Error error) { + final Optional maybeError = Netty4Utils.maybeError(cause); + assertTrue(maybeError.isPresent()); + assertThat(maybeError.get(), equalTo(error)); + } + private BytesReference getRandomizedBytesReference(int length) throws IOException { // we know bytes stream output always creates a paged bytes reference, we use it to create randomized content ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(length, bigarrays);