Log exceptions in TcpTransport at DEBUG level (#51612)

When running Elasticsearch on a flaky network, we may see nodes leaving the
cluster with reason `disconnected`. It may be useful to the cluster
administrator to see the full exception that caused the disconnection, but this
is only available with `TRACE` level logging which commingles the details of
the problem with other messages that are not useful to end users.

This commit promotes logging of exceptions in `TcpTransport` from `TRACE` to
`DEBUG` to separate them from the truly `TRACE`-level messages.
This commit is contained in:
David Turner 2020-01-31 01:12:05 +00:00
parent 77b00fc0c0
commit 72ae0ca73f
2 changed files with 106 additions and 6 deletions

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.metrics.MeanMetric;
@ -577,6 +578,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
public void onException(TcpChannel channel, Exception e) {
handleException(channel, e, lifecycle, outboundHandler);
}
// exposed for tests
static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle, OutboundHandler outboundHandler) {
if (!lifecycle.started()) {
// just close and ignore - we are already stopped and just need to make sure we release all resources
CloseableChannel.closeChannel(channel);
@ -584,20 +590,20 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
if (isCloseConnectionException(e)) {
logger.trace(() -> new ParameterizedMessage(
logger.debug(() -> new ParameterizedMessage(
"close connection exception caught on transport layer [{}], disconnecting from relevant node", channel), e);
// close the channel, which will cause a node to be disconnected if relevant
CloseableChannel.closeChannel(channel);
} else if (isConnectException(e)) {
logger.trace(() -> new ParameterizedMessage("connect exception caught on transport layer [{}]", channel), e);
logger.debug(() -> new ParameterizedMessage("connect exception caught on transport layer [{}]", channel), e);
// close the channel as safe measure, which will cause a node to be disconnected if relevant
CloseableChannel.closeChannel(channel);
} else if (e instanceof BindException) {
logger.trace(() -> new ParameterizedMessage("bind exception caught on transport layer [{}]", channel), e);
logger.debug(() -> new ParameterizedMessage("bind exception caught on transport layer [{}]", channel), e);
// close the channel as safe measure, which will cause a node to be disconnected if relevant
CloseableChannel.closeChannel(channel);
} else if (e instanceof CancelledKeyException) {
logger.trace(() -> new ParameterizedMessage(
logger.debug(() -> new ParameterizedMessage(
"cancelled key exception caught on transport layer [{}], disconnecting from relevant node", channel), e);
// close the channel as safe measure, which will cause a node to be disconnected if relevant
CloseableChannel.closeChannel(channel);
@ -619,7 +625,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected void onServerException(TcpServerChannel channel, Exception e) {
if (e instanceof BindException) {
logger.trace(() -> new ParameterizedMessage("bind exception from server channel caught on transport layer [{}]", channel), e);
logger.debug(() -> new ParameterizedMessage("bind exception from server channel caught on transport layer [{}]", channel), e);
} else {
logger.error(new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);
}
@ -817,7 +823,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
*/
public static class HttpRequestOnTransportException extends ElasticsearchException {
private HttpRequestOnTransportException(String msg) {
HttpRequestOnTransportException(String msg) {
super(msg);
}

View File

@ -19,26 +19,41 @@
package org.elasticsearch.transport;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matcher;
import java.io.IOException;
import java.io.StreamCorruptedException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
/** Unit tests for {@link TcpTransport} */
@ -354,4 +369,83 @@ public class TcpTransportTests extends ESTestCase {
"(not HTTP port) of a remote node is specified in the configuration", ex.getMessage());
}
}
@TestLogging(reason = "testing logging", value = "org.elasticsearch.transport.TcpTransport:DEBUG")
public void testExceptionHandling() throws IllegalAccessException {
testExceptionHandling(false, new ElasticsearchException("simulated"), true,
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.ERROR, "*"),
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.WARN, "*"),
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.INFO, "*"),
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.DEBUG, "*"));
testExceptionHandling(new ElasticsearchException("simulated"),
new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport",
Level.WARN, "exception caught on transport layer [*], closing connection"));
testExceptionHandling(new ClosedChannelException(),
new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport",
Level.DEBUG, "close connection exception caught on transport layer [*], disconnecting from relevant node"));
testExceptionHandling(new ElasticsearchException("Connection reset"),
new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport",
Level.DEBUG, "close connection exception caught on transport layer [*], disconnecting from relevant node"));
testExceptionHandling(new BindException(),
new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport",
Level.DEBUG, "bind exception caught on transport layer [*]"));
testExceptionHandling(new CancelledKeyException(),
new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport",
Level.DEBUG, "cancelled key exception caught on transport layer [*], disconnecting from relevant node"));
testExceptionHandling(true, new TcpTransport.HttpRequestOnTransportException("test"), false,
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.ERROR, "*"),
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.WARN, "*"),
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.INFO, "*"),
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.DEBUG, "*"));
testExceptionHandling(new StreamCorruptedException("simulated"),
new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport",
Level.WARN, "simulated, [*], closing connection"));
}
private void testExceptionHandling(Exception exception,
MockLogAppender.LoggingExpectation... expectations) throws IllegalAccessException {
testExceptionHandling(true, exception, true, expectations);
}
private void testExceptionHandling(boolean startTransport, Exception exception, boolean expectClosed,
MockLogAppender.LoggingExpectation... expectations) throws IllegalAccessException {
final TestThreadPool testThreadPool = new TestThreadPool("test");
MockLogAppender appender = new MockLogAppender();
try {
appender.start();
Loggers.addAppender(LogManager.getLogger(TcpTransport.class), appender);
for (MockLogAppender.LoggingExpectation expectation : expectations) {
appender.addExpectation(expectation);
}
final Lifecycle lifecycle = new Lifecycle();
if (startTransport) {
lifecycle.moveToStarted();
}
final FakeTcpChannel channel = new FakeTcpChannel();
final PlainActionFuture<Void> listener = new PlainActionFuture<>();
channel.addCloseListener(listener);
TcpTransport.handleException(channel, exception, lifecycle,
new OutboundHandler(randomAlphaOfLength(10), Version.CURRENT, testThreadPool, BigArrays.NON_RECYCLING_INSTANCE));
if (expectClosed) {
assertTrue(listener.isDone());
assertThat(listener.actionGet(), nullValue());
} else {
assertFalse(listener.isDone());
}
appender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(LogManager.getLogger(TcpTransport.class), appender);
appender.stop();
ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS);
}
}
}