diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 57a50936eb3..a2396532e49 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.metrics.MeanMetric; @@ -577,6 +578,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } public void onException(TcpChannel channel, Exception e) { + handleException(channel, e, lifecycle, outboundHandler); + } + + // exposed for tests + static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle, OutboundHandler outboundHandler) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources CloseableChannel.closeChannel(channel); @@ -584,20 +590,20 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } if (isCloseConnectionException(e)) { - logger.trace(() -> new ParameterizedMessage( + logger.debug(() -> new ParameterizedMessage( "close connection exception caught on transport layer [{}], disconnecting from relevant node", channel), e); // close the channel, which will cause a node to be disconnected if relevant CloseableChannel.closeChannel(channel); } else if (isConnectException(e)) { - logger.trace(() -> new ParameterizedMessage("connect exception caught on transport layer [{}]", channel), e); + logger.debug(() -> new ParameterizedMessage("connect exception caught on transport layer [{}]", channel), e); // close the channel as safe measure, which will cause a node to be disconnected if relevant CloseableChannel.closeChannel(channel); } else if (e instanceof BindException) { - logger.trace(() -> new ParameterizedMessage("bind exception caught on transport layer [{}]", channel), e); + logger.debug(() -> new ParameterizedMessage("bind exception caught on transport layer [{}]", channel), e); // close the channel as safe measure, which will cause a node to be disconnected if relevant CloseableChannel.closeChannel(channel); } else if (e instanceof CancelledKeyException) { - logger.trace(() -> new ParameterizedMessage( + logger.debug(() -> new ParameterizedMessage( "cancelled key exception caught on transport layer [{}], disconnecting from relevant node", channel), e); // close the channel as safe measure, which will cause a node to be disconnected if relevant CloseableChannel.closeChannel(channel); @@ -619,7 +625,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements protected void onServerException(TcpServerChannel channel, Exception e) { if (e instanceof BindException) { - logger.trace(() -> new ParameterizedMessage("bind exception from server channel caught on transport layer [{}]", channel), e); + logger.debug(() -> new ParameterizedMessage("bind exception from server channel caught on transport layer [{}]", channel), e); } else { logger.error(new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); } @@ -817,7 +823,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements */ public static class HttpRequestOnTransportException extends ElasticsearchException { - private HttpRequestOnTransportException(String msg) { + HttpRequestOnTransportException(String msg) { super(msg); } diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 136cd250f85..fd346177d42 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -19,26 +19,41 @@ package org.elasticsearch.transport; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matcher; import java.io.IOException; import java.io.StreamCorruptedException; +import java.net.BindException; import java.net.InetSocketAddress; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; import java.util.Collections; +import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; /** Unit tests for {@link TcpTransport} */ @@ -354,4 +369,83 @@ public class TcpTransportTests extends ESTestCase { "(not HTTP port) of a remote node is specified in the configuration", ex.getMessage()); } } + + @TestLogging(reason = "testing logging", value = "org.elasticsearch.transport.TcpTransport:DEBUG") + public void testExceptionHandling() throws IllegalAccessException { + testExceptionHandling(false, new ElasticsearchException("simulated"), true, + new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.ERROR, "*"), + new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.WARN, "*"), + new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.INFO, "*"), + new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.DEBUG, "*")); + testExceptionHandling(new ElasticsearchException("simulated"), + new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", + Level.WARN, "exception caught on transport layer [*], closing connection")); + testExceptionHandling(new ClosedChannelException(), + new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", + Level.DEBUG, "close connection exception caught on transport layer [*], disconnecting from relevant node")); + testExceptionHandling(new ElasticsearchException("Connection reset"), + new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", + Level.DEBUG, "close connection exception caught on transport layer [*], disconnecting from relevant node")); + testExceptionHandling(new BindException(), + new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", + Level.DEBUG, "bind exception caught on transport layer [*]")); + testExceptionHandling(new CancelledKeyException(), + new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", + Level.DEBUG, "cancelled key exception caught on transport layer [*], disconnecting from relevant node")); + testExceptionHandling(true, new TcpTransport.HttpRequestOnTransportException("test"), false, + new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.ERROR, "*"), + new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.WARN, "*"), + new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.INFO, "*"), + new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.DEBUG, "*")); + testExceptionHandling(new StreamCorruptedException("simulated"), + new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", + Level.WARN, "simulated, [*], closing connection")); + } + + private void testExceptionHandling(Exception exception, + MockLogAppender.LoggingExpectation... expectations) throws IllegalAccessException { + testExceptionHandling(true, exception, true, expectations); + } + + private void testExceptionHandling(boolean startTransport, Exception exception, boolean expectClosed, + MockLogAppender.LoggingExpectation... expectations) throws IllegalAccessException { + final TestThreadPool testThreadPool = new TestThreadPool("test"); + MockLogAppender appender = new MockLogAppender(); + + try { + appender.start(); + + Loggers.addAppender(LogManager.getLogger(TcpTransport.class), appender); + for (MockLogAppender.LoggingExpectation expectation : expectations) { + appender.addExpectation(expectation); + } + + final Lifecycle lifecycle = new Lifecycle(); + + if (startTransport) { + lifecycle.moveToStarted(); + } + + final FakeTcpChannel channel = new FakeTcpChannel(); + final PlainActionFuture listener = new PlainActionFuture<>(); + channel.addCloseListener(listener); + + TcpTransport.handleException(channel, exception, lifecycle, + new OutboundHandler(randomAlphaOfLength(10), Version.CURRENT, testThreadPool, BigArrays.NON_RECYCLING_INSTANCE)); + + if (expectClosed) { + assertTrue(listener.isDone()); + assertThat(listener.actionGet(), nullValue()); + } else { + assertFalse(listener.isDone()); + } + + appender.assertAllExpectationsMatched(); + + } finally { + Loggers.removeAppender(LogManager.getLogger(TcpTransport.class), appender); + appender.stop(); + ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS); + } + } }