Log and track open/close of transport connections (#60297)

Transport connections between nodes remain in place until one or other
node shuts down or the connection is disrupted by a flaky network.
Today it is very difficult to demonstrate that transient failures and
cluster instability are caused by the network even though this is often
the case. In particular, transport connections open and close without
logging anything, even at `DEBUG` level, making it very hard to quantify
the scale of the problem or to correlate the networking problems with
external events.

This commit adds the missing `DEBUG`-level logging when transport
connections open and close, and also tracks the total number of
transport connections a node has opened as a measure of the stability of
the underlying network.
This commit is contained in:
David Turner 2020-07-28 16:58:00 +01:00
parent ee18538fd7
commit 9450ea08b4
7 changed files with 126 additions and 10 deletions

View File

@ -1818,7 +1818,15 @@ Contains transport statistics for the node.
======
`server_open`::
(integer)
Number of open TCP connections used for internal communication between nodes.
Current number of inbound TCP connections used for internal communication between nodes.
`total_outbound_connections`::
(integer)
The cumulative number of outbound transport connections that this node has
opened since it started. Each transport connection may comprise multiple TCP
connections but is only counted once in this statistic. Transport connections
are typically <<long-lived-connections,long-lived>> so this statistic should
remain constant in a stable cluster.
`rx_count`::
(integer)

View File

@ -25,14 +25,15 @@ import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportLogger;
import java.io.IOException;
@ESIntegTestCase.ClusterScope(numDataNodes = 2)
@TestLogging(
value = "org.elasticsearch.transport.netty4.ESLoggingHandler:trace,org.elasticsearch.transport.TransportLogger:trace",
reason = "to ensure we log network events on TRACE level")
public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
private MockLogAppender appender;
@ -42,17 +43,22 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
appender = new MockLogAppender();
Loggers.addAppender(LogManager.getLogger(ESLoggingHandler.class), appender);
Loggers.addAppender(LogManager.getLogger(TransportLogger.class), appender);
Loggers.addAppender(LogManager.getLogger(TcpTransport.class), appender);
appender.start();
}
public void tearDown() throws Exception {
Loggers.removeAppender(LogManager.getLogger(ESLoggingHandler.class), appender);
Loggers.removeAppender(LogManager.getLogger(TransportLogger.class), appender);
Loggers.removeAppender(LogManager.getLogger(TcpTransport.class), appender);
appender.stop();
super.tearDown();
}
public void testLoggingHandler() throws IllegalAccessException {
@TestLogging(
value = "org.elasticsearch.transport.netty4.ESLoggingHandler:trace,org.elasticsearch.transport.TransportLogger:trace",
reason = "to ensure we log network events on TRACE level")
public void testLoggingHandler() {
final String writePattern =
".*\\[length: \\d+" +
", request id: \\d+" +
@ -86,4 +92,18 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
appender.assertAllExpectationsMatched();
}
@TestLogging(value = "org.elasticsearch.transport.TcpTransport:DEBUG", reason = "to ensure we log connection events on DEBUG level")
public void testConnectionLogging() throws IOException {
appender.addExpectation(new MockLogAppender.PatternSeenEventExpectation("open connection log",
TcpTransport.class.getCanonicalName(), Level.DEBUG,
".*opened transport connection \\[[1-9][0-9]*\\] to .*"));
appender.addExpectation(new MockLogAppender.PatternSeenEventExpectation("close connection log",
TcpTransport.class.getCanonicalName(), Level.DEBUG,
".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\].*"));
final String nodeName = internalCluster().startNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeName));
appender.assertAllExpectationsMatched();
}
}

View File

@ -25,12 +25,15 @@ import org.elasticsearch.NioIntegTestCase;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportLogger;
import java.io.IOException;
@ESIntegTestCase.ClusterScope(numDataNodes = 2)
@TestLogging(value = "org.elasticsearch.transport.TransportLogger:trace", reason = "to ensure we log network events on TRACE level")
public class NioTransportLoggingIT extends NioIntegTestCase {
private MockLogAppender appender;
@ -39,16 +42,19 @@ public class NioTransportLoggingIT extends NioIntegTestCase {
super.setUp();
appender = new MockLogAppender();
Loggers.addAppender(LogManager.getLogger(TransportLogger.class), appender);
Loggers.addAppender(LogManager.getLogger(TcpTransport.class), appender);
appender.start();
}
public void tearDown() throws Exception {
Loggers.removeAppender(LogManager.getLogger(TransportLogger.class), appender);
Loggers.removeAppender(LogManager.getLogger(TcpTransport.class), appender);
appender.stop();
super.tearDown();
}
public void testLoggingHandler() throws IllegalAccessException {
@TestLogging(value = "org.elasticsearch.transport.TransportLogger:trace", reason = "to ensure we log network events on TRACE level")
public void testLoggingHandler() {
final String writePattern =
".*\\[length: \\d+" +
", request id: \\d+" +
@ -78,4 +84,18 @@ public class NioTransportLoggingIT extends NioIntegTestCase {
appender.assertAllExpectationsMatched();
}
@TestLogging(value = "org.elasticsearch.transport.TcpTransport:DEBUG", reason = "to ensure we log connection events on DEBUG level")
public void testConnectionLogging() throws IOException {
appender.addExpectation(new MockLogAppender.PatternSeenEventExpectation("open connection log",
TcpTransport.class.getCanonicalName(), Level.DEBUG,
".*opened transport connection \\[[1-9][0-9]*\\] to .*"));
appender.addExpectation(new MockLogAppender.PatternSeenEventExpectation("close connection log",
TcpTransport.class.getCanonicalName(), Level.DEBUG,
".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\].*"));
final String nodeName = internalCluster().startNode();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeName));
appender.assertAllExpectationsMatched();
}
}

View File

@ -0,0 +1,24 @@
---
"Transport stats":
- skip:
version: " - 7.9.99"
reason: "total_outbound_connections field is not returned in prior versions"
features: [arbitrary_key]
- do:
nodes.info: {}
- set:
nodes._arbitrary_key_: node_id
- do:
nodes.stats:
metric: [ transport ]
- is_false: nodes.$node_id.store
- is_true: nodes.$node_id.transport
- gte: { nodes.$node_id.transport.server_open: 0 }
- gte: { nodes.$node_id.transport.total_outbound_connections: 0 }
- gte: { nodes.$node_id.transport.rx_count: 0 }
- gte: { nodes.$node_id.transport.tx_count: 0 }
- gte: { nodes.$node_id.transport.rx_size_in_bytes: 0 }
- gte: { nodes.$node_id.transport.tx_size_in_bytes: 0 }

View File

@ -82,6 +82,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -133,6 +134,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private final RequestHandlers requestHandlers = new RequestHandlers();
private final AtomicLong outboundConnectionCount = new AtomicLong(); // also used as a correlation ID for open/close logs
public TcpTransport(Settings settings, Version version, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
@ -845,7 +848,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
final long messagesSent = statsTracker.getMessagesSent();
final long messagesReceived = statsTracker.getMessagesReceived();
final long bytesRead = statsTracker.getBytesRead();
return new TransportStats(acceptedChannels.size(), messagesReceived, bytesRead, messagesSent, bytesWritten);
return new TransportStats(acceptedChannels.size(), outboundConnectionCount.get(),
messagesReceived, bytesRead, messagesSent, bytesWritten);
}
/**
@ -943,6 +947,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
final TcpChannel handshakeChannel = channels.get(0);
try {
executeHandshake(node, handshakeChannel, connectionProfile, ActionListener.wrap(version -> {
final long connectionId = outboundConnectionCount.incrementAndGet();
logger.debug("opened transport connection [{}] to [{}] using channels [{}]", connectionId, node, channels);
NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
long relativeMillisTime = threadPool.relativeTimeInMillis();
nodeChannels.channels.forEach(ch -> {
@ -951,6 +957,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
ch.addCloseListener(ActionListener.wrap(nodeChannels::close));
});
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
nodeChannels.addCloseListener(new ChannelCloseLogger(node, connectionId, relativeMillisTime));
listener.onResponse(nodeChannels);
}, e -> closeAndFail(e instanceof ConnectTransportException ?
e : new ConnectTransportException(node, "general node connection failure", e))));
@ -983,4 +990,28 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
}
}
private class ChannelCloseLogger implements ActionListener<Void> {
private final DiscoveryNode node;
private final long connectionId;
private final long openTimeMillis;
ChannelCloseLogger(DiscoveryNode node, long connectionId, long openTimeMillis) {
this.node = node;
this.connectionId = connectionId;
this.openTimeMillis = openTimeMillis;
}
@Override
public void onResponse(Void ignored) {
long closeTimeMillis = threadPool.relativeTimeInMillis();
logger.debug("closed transport connection [{}] to [{}] with age [{}ms]", connectionId, node, closeTimeMillis - openTimeMillis);
}
@Override
public void onFailure(Exception e) {
assert false : e; // never called
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -32,13 +33,15 @@ import java.io.IOException;
public class TransportStats implements Writeable, ToXContentFragment {
private final long serverOpen;
private final long totalOutboundConnections;
private final long rxCount;
private final long rxSize;
private final long txCount;
private final long txSize;
public TransportStats(long serverOpen, long rxCount, long rxSize, long txCount, long txSize) {
public TransportStats(long serverOpen, long totalOutboundConnections, long rxCount, long rxSize, long txCount, long txSize) {
this.serverOpen = serverOpen;
this.totalOutboundConnections = totalOutboundConnections;
this.rxCount = rxCount;
this.rxSize = rxSize;
this.txCount = txCount;
@ -47,6 +50,11 @@ public class TransportStats implements Writeable, ToXContentFragment {
public TransportStats(StreamInput in) throws IOException {
serverOpen = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_7_10_0)) {
totalOutboundConnections = in.readVLong();
} else {
totalOutboundConnections = 0L;
}
rxCount = in.readVLong();
rxSize = in.readVLong();
txCount = in.readVLong();
@ -56,6 +64,9 @@ public class TransportStats implements Writeable, ToXContentFragment {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(serverOpen);
if (out.getVersion().onOrAfter(Version.V_7_10_0)) {
out.writeVLong(totalOutboundConnections);
}
out.writeVLong(rxCount);
out.writeVLong(rxSize);
out.writeVLong(txCount);
@ -106,6 +117,7 @@ public class TransportStats implements Writeable, ToXContentFragment {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.TRANSPORT);
builder.field(Fields.SERVER_OPEN, serverOpen);
builder.field(Fields.TOTAL_OUTBOUND_CONNECTIONS, totalOutboundConnections);
builder.field(Fields.RX_COUNT, rxCount);
builder.humanReadableField(Fields.RX_SIZE_IN_BYTES, Fields.RX_SIZE, new ByteSizeValue(rxSize));
builder.field(Fields.TX_COUNT, txCount);
@ -117,6 +129,7 @@ public class TransportStats implements Writeable, ToXContentFragment {
static final class Fields {
static final String TRANSPORT = "transport";
static final String SERVER_OPEN = "server_open";
static final String TOTAL_OUTBOUND_CONNECTIONS = "total_outbound_connections";
static final String RX_COUNT = "rx_count";
static final String RX_SIZE = "rx_size";
static final String RX_SIZE_IN_BYTES = "rx_size_in_bytes";

View File

@ -442,7 +442,7 @@ public class NodeStatsTests extends ESTestCase {
fsInfo = new FsInfo(randomNonNegativeLong(), ioStats, paths);
}
TransportStats transportStats = frequently() ? new TransportStats(randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) : null;
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) : null;
HttpStats httpStats = frequently() ? new HttpStats(randomNonNegativeLong(), randomNonNegativeLong()) : null;
AllCircuitBreakerStats allCircuitBreakerStats = null;
if (frequently()) {