From f18b0d293c325a28e578df6a965a8881dcf7850d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 16 Jun 2017 22:34:11 +0200 Subject: [PATCH] Move TransportStats accounting into TcpTransport (#25251) Today TcpTransport is the de-facto base-class for transport implementations. The need for all the callbacks we have in TransportServiceAdaptor are not necessary anymore since we can simply have the logic inside the base class itself. This change moves the stats metrics directly into TcpTransport removing the need for low level bytes send / received callbacks. --- .../elasticsearch/transport/TcpTransport.java | 76 +++++-- .../elasticsearch/transport/Transport.java | 7 +- .../transport/TransportService.java | 18 +- .../transport/TransportServiceAdapter.java | 4 - .../transport/FailAndRetryMockTransport.java | 11 +- .../cluster/NodeConnectionsServiceTests.java | 13 +- .../transport/TCPTransportTests.java | 2 +- .../netty4/Netty4MessageChannelHandler.java | 14 -- .../transport/netty4/Netty4Transport.java | 2 +- .../test/transport/CapturingTransport.java | 11 +- .../test/transport/MockTransportService.java | 11 +- .../AbstractSimpleTransportTestCase.java | 190 ++++++++++++++++++ .../transport/MockTcpTransport.java | 6 +- 13 files changed, 280 insertions(+), 85 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 22aced389f8..31d871a2ae8 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -50,6 +50,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; @@ -181,6 +182,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i private final CounterMetric numHandshakes = new CounterMetric(); private static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake"; + private final MeanMetric readBytesMetric = new MeanMetric(); + private final MeanMetric transmittedBytesMetric = new MeanMetric(); + public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) { @@ -300,14 +304,14 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i DiscoveryNode node = entry.getKey(); NodeChannels channels = entry.getValue(); for (Channel channel : channels.getChannels()) { - internalSendMessage(channel, pingHeader, new NotifyOnceListener() { + internalSendMessage(channel, pingHeader, new SendMetricListener(pingHeader.length()) { @Override - public void innerOnResponse(Channel channel) { + protected void innerInnerOnResponse(Channel channel) { successfulPings.inc(); } @Override - public void innerOnFailure(Exception e) { + protected void innerOnFailure(Exception e) { if (isOpen(channel)) { logger.debug( (Supplier) () -> new ParameterizedMessage("[{}] failed to send ping transport message", node), e); @@ -984,9 +988,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } else if (e instanceof TcpTransport.HttpOnTransportException) { // in case we are able to return data, serialize the exception content and sent it back to the client if (isOpen(channel)) { - final NotifyOnceListener closeChannel = new NotifyOnceListener() { + BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)); + final SendMetricListener closeChannel = new SendMetricListener(message.length()) { @Override - public void innerOnResponse(Channel channel) { + protected void innerInnerOnResponse(Channel channel) { try { closeChannels(Collections.singletonList(channel)); } catch (IOException e1) { @@ -995,7 +1000,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } @Override - public void innerOnFailure(Exception e) { + protected void innerOnFailure(Exception e) { try { closeChannels(Collections.singletonList(channel)); } catch (IOException e1) { @@ -1004,7 +1009,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } } }; - internalSendMessage(channel, new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)), closeChannel); + internalSendMessage(channel, message, closeChannel); } } else { logger.warn( @@ -1086,7 +1091,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i final TransportRequestOptions finalOptions = options; // this might be called in a different thread SendListener onRequestSent = new SendListener(stream, - () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions)); + () -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions), message.length()); internalSendMessage(targetChannel, message, onRequestSent); addedReleaseListener = true; } finally { @@ -1099,7 +1104,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i /** * sends a message to the given channel, using the given callbacks. */ - private void internalSendMessage(Channel targetChannel, BytesReference message, NotifyOnceListener listener) { + private void internalSendMessage(Channel targetChannel, BytesReference message, SendMetricListener listener) { try { sendMessage(targetChannel, message, listener); } catch (Exception ex) { @@ -1131,9 +1136,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i status = TransportStatus.setError(status); final BytesReference bytes = stream.bytes(); final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length()); + CompositeBytesReference message = new CompositeBytesReference(header, bytes); SendListener onResponseSent = new SendListener(null, - () -> transportServiceAdapter.onResponseSent(requestId, action, error)); - internalSendMessage(channel, new CompositeBytesReference(header, bytes), onResponseSent); + () -> transportServiceAdapter.onResponseSent(requestId, action, error), message.length()); + internalSendMessage(channel, message, onResponseSent); } } @@ -1162,13 +1168,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } threadPool.getThreadContext().writeTo(stream); stream.setVersion(nodeVersion); - BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream); + BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream); final TransportResponseOptions finalOptions = options; // this might be called in a different thread SendListener listener = new SendListener(stream, - () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions)); - internalSendMessage(channel, reference, listener); + () -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions), message.length()); + internalSendMessage(channel, message, listener); addedReleaseListener = true; } finally { if (!addedReleaseListener) { @@ -1324,7 +1330,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i public final void messageReceived(BytesReference reference, Channel channel, String profileName, InetSocketAddress remoteAddress, int messageLengthBytes) throws IOException { final int totalMessageSize = messageLengthBytes + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE; - transportServiceAdapter.addBytesReceived(totalMessageSize); + readBytesMetric.inc(totalMessageSize); // we have additional bytes to read, outside of the header boolean hasMessageBytesToRead = (totalMessageSize - TcpHeader.HEADER_SIZE) > 0; StreamInput streamIn = reference.streamInput(); @@ -1662,22 +1668,42 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } } - private final class SendListener extends NotifyOnceListener { + /** + * This listener increments the transmitted bytes metric on success. + */ + private abstract class SendMetricListener extends NotifyOnceListener { + private final long messageSize; + + private SendMetricListener(long messageSize) { + this.messageSize = messageSize; + } + + @Override + protected final void innerOnResponse(T object) { + transmittedBytesMetric.inc(messageSize); + innerInnerOnResponse(object); + } + + protected abstract void innerInnerOnResponse(T object); + } + + private final class SendListener extends SendMetricListener { private final Releasable optionalReleasable; private final Runnable transportAdaptorCallback; - private SendListener(Releasable optionalReleasable, Runnable transportAdaptorCallback) { + private SendListener(Releasable optionalReleasable, Runnable transportAdaptorCallback, long messageLength) { + super(messageLength); this.optionalReleasable = optionalReleasable; this.transportAdaptorCallback = transportAdaptorCallback; } @Override - public void innerOnResponse(Channel channel) { + protected void innerInnerOnResponse(Channel channel) { release(); } @Override - public void innerOnFailure(Exception e) { + protected void innerOnFailure(Exception e) { release(); } @@ -1701,4 +1727,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i final int getNumConnectedNodes() { return connectedNodes.size(); } + + /** + * Returns count of currently open connections + */ + protected abstract long getNumOpenServerConnections(); + + @Override + public final TransportStats getStats() { + return new TransportStats( + getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(), + transmittedBytesMetric.sum()); + } } diff --git a/core/src/main/java/org/elasticsearch/transport/Transport.java b/core/src/main/java/org/elasticsearch/transport/Transport.java index a32289332ea..5d22e156d9d 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transport.java +++ b/core/src/main/java/org/elasticsearch/transport/Transport.java @@ -75,11 +75,6 @@ public interface Transport extends LifecycleComponent { */ void disconnectFromNode(DiscoveryNode node); - /** - * Returns count of currently open connections - */ - long serverOpen(); - List getLocalAddresses(); default CircuitBreaker getInFlightRequestBreaker() { @@ -110,6 +105,8 @@ public interface Transport extends LifecycleComponent { */ Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException; + TransportStats getStats(); + /** * A unidirectional connection to a {@link DiscoveryNode} */ diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index e5382e4e261..0a4745cda79 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -203,8 +203,6 @@ public class TransportService extends AbstractLifecycleComponent { @Override protected void doStart() { - adapter.rxMetric.clear(); - adapter.txMetric.clear(); transport.transportServiceAdapter(adapter); transport.start(); @@ -292,8 +290,7 @@ public class TransportService extends AbstractLifecycleComponent { } public TransportStats stats() { - return new TransportStats( - transport.serverOpen(), adapter.rxMetric.count(), adapter.rxMetric.sum(), adapter.txMetric.count(), adapter.txMetric.sum()); + return transport.getStats(); } public BoundTransportAddress boundAddress() { @@ -738,19 +735,6 @@ public class TransportService extends AbstractLifecycleComponent { protected class Adapter implements TransportServiceAdapter { - final MeanMetric rxMetric = new MeanMetric(); - final MeanMetric txMetric = new MeanMetric(); - - @Override - public void addBytesReceived(long size) { - rxMetric.inc(size); - } - - @Override - public void addBytesSent(long size) { - txMetric.inc(size); - } - @Override public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) { diff --git a/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java b/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java index 70748b01a68..24a71a99998 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportServiceAdapter.java @@ -23,10 +23,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; public interface TransportServiceAdapter extends TransportConnectionListener { - void addBytesReceived(long size); - - void addBytesSent(long size); - /** called by the {@link Transport} implementation once a request has been sent */ void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options); diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index 142216bf2dd..dbe85898209 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -41,6 +41,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import java.io.IOException; import java.net.UnknownHostException; @@ -193,11 +194,6 @@ abstract class FailAndRetryMockTransport imp } - @Override - public long serverOpen() { - return 0; - } - @Override public Lifecycle.State lifecycleState() { return null; @@ -231,4 +227,9 @@ abstract class FailAndRetryMockTransport imp public long newRequestId() { return requestId.incrementAndGet(); } + + @Override + public TransportStats getStats() { + throw new UnsupportedOperationException(); + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index a1b80803e0c..2e7a857cc7b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import org.junit.After; import org.junit.Before; @@ -241,11 +242,6 @@ public class NodeConnectionsServiceTests extends ESTestCase { return getConnection(node); } - @Override - public long serverOpen() { - return 0; - } - @Override public List getLocalAddresses() { return null; @@ -263,12 +259,10 @@ public class NodeConnectionsServiceTests extends ESTestCase { @Override public void addLifecycleListener(LifecycleListener listener) { - } @Override public void removeLifecycleListener(LifecycleListener listener) { - } @Override @@ -279,5 +273,10 @@ public class NodeConnectionsServiceTests extends ESTestCase { @Override public void close() {} + + @Override + public TransportStats getStats() { + throw new UnsupportedOperationException(); + } } } diff --git a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java index a68416cc25a..6ce6c2a96d6 100644 --- a/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java @@ -235,7 +235,7 @@ public class TCPTransportTests extends ESTestCase { } @Override - public long serverOpen() { + public long getNumOpenServerConnections() { return 0; } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java index e83cfc62fda..9763a5116b1 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java @@ -22,11 +22,8 @@ package org.elasticsearch.transport.netty4; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpHeader; -import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.TransportServiceAdapter; import org.elasticsearch.transport.Transports; import java.net.InetSocketAddress; @@ -37,25 +34,14 @@ import java.net.InetSocketAddress; */ final class Netty4MessageChannelHandler extends ChannelDuplexHandler { - private final TransportServiceAdapter transportServiceAdapter; private final Netty4Transport transport; private final String profileName; Netty4MessageChannelHandler(Netty4Transport transport, String profileName) { - this.transportServiceAdapter = transport.transportServiceAdapter(); this.transport = transport; this.profileName = profileName; } - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof ByteBuf && transportServiceAdapter != null) { - // record the number of bytes send on the channel - promise.addListener(f -> transportServiceAdapter.addBytesSent(((ByteBuf) msg).readableBytes())); - } - ctx.write(msg, promise); - } - @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Transports.assertTransportThread(); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index abe0739c243..140041b53b7 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -306,7 +306,7 @@ public class Netty4Transport extends TcpTransport { } @Override - public long serverOpen() { + public long getNumOpenServerConnections() { Netty4OpenChannelsHandler channels = serverOpenChannels; return channels == null ? 0 : channels.numberOfOpenChannels(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 55519ec2af2..2ccddf6bc54 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import java.io.IOException; import java.io.UncheckedIOException; @@ -213,6 +214,11 @@ public class CapturingTransport implements Transport { }; } + @Override + public TransportStats getStats() { + throw new UnsupportedOperationException(); + } + @Override public void transportServiceAdapter(TransportServiceAdapter adapter) { this.adapter = adapter; @@ -250,11 +256,6 @@ public class CapturingTransport implements Transport { } - @Override - public long serverOpen() { - return 0; - } - @Override public Lifecycle.State lifecycleState() { return null; diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 210190940d2..467b2c7f3c8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportServiceAdapter; +import org.elasticsearch.transport.TransportStats; import java.io.IOException; import java.net.UnknownHostException; @@ -572,11 +573,6 @@ public final class MockTransportService extends TransportService { transport.disconnectFromNode(node); } - @Override - public long serverOpen() { - return transport.serverOpen(); - } - @Override public List getLocalAddresses() { return transport.getLocalAddresses(); @@ -609,6 +605,11 @@ public final class MockTransportService extends TransportService { }; } + @Override + public TransportStats getStats() { + return transport.getStats(); + } + @Override public Lifecycle.State lifecycleState() { return transport.lifecycleState(); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 99704235cc7..e4f2fbae917 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2252,4 +2252,194 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { assertPendingConnections(0, serviceC.getOriginalTransport()); } + public void testTransportStats() throws IOException, InterruptedException { + MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true); + CountDownLatch receivedLatch = new CountDownLatch(1); + CountDownLatch sendResponseLatch = new CountDownLatch(1); + serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + // don't block on a network thread here + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + + @Override + protected void doRun() throws Exception { + receivedLatch.countDown(); + sendResponseLatch.await(); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + }); + }); + serviceC.start(); + serviceC.acceptIncomingRequests(); + CountDownLatch responseLatch = new CountDownLatch(1); + TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { + @Override + public TransportResponse newInstance() { + return TransportResponse.Empty.INSTANCE; + } + + @Override + public void handleResponse(TransportResponse response) { + responseLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + responseLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }; + + TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet + assertEquals(0, stats.getRxCount()); + assertEquals(0, stats.getTxCount()); + assertEquals(0, stats.getRxSize().getBytes()); + assertEquals(0, stats.getTxSize().getBytes()); + + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) { + stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake + assertEquals(1, stats.getRxCount()); + assertEquals(1, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(45, stats.getTxSize().getBytes()); + serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, + transportResponseHandler); + receivedLatch.await(); + stats = serviceC.transport.getStats(); // request has ben send + assertEquals(1, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + sendResponseLatch.countDown(); + responseLatch.await(); + stats = serviceC.transport.getStats(); // response has been received + assertEquals(2, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + assertEquals(46, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + } finally { + try { + assertPendingConnections(0, serviceC.getOriginalTransport()); + } finally { + serviceC.close(); + } + } + } + + public void testTransportStatsWithException() throws IOException, InterruptedException { + MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true); + CountDownLatch receivedLatch = new CountDownLatch(1); + CountDownLatch sendResponseLatch = new CountDownLatch(1); + Exception ex = new RuntimeException("boom"); + ex.setStackTrace(new StackTraceElement[0]); + serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + // don't block on a network thread here + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException e1) { + throw new UncheckedIOException(e1); + } + } + + @Override + protected void doRun() throws Exception { + receivedLatch.countDown(); + sendResponseLatch.await(); + onFailure(ex); + } + }); + }); + serviceC.start(); + serviceC.acceptIncomingRequests(); + CountDownLatch responseLatch = new CountDownLatch(1); + TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { + @Override + public TransportResponse newInstance() { + return TransportResponse.Empty.INSTANCE; + } + + @Override + public void handleResponse(TransportResponse response) { + responseLatch.countDown(); + } + + @Override + public void handleException(TransportException exp) { + responseLatch.countDown(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }; + + TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet + assertEquals(0, stats.getRxCount()); + assertEquals(0, stats.getTxCount()); + assertEquals(0, stats.getRxSize().getBytes()); + assertEquals(0, stats.getTxSize().getBytes()); + + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(1, + TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.PING, + TransportRequestOptions.Type.RECOVERY, + TransportRequestOptions.Type.REG, + TransportRequestOptions.Type.STATE); + try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) { + stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake + assertEquals(1, stats.getRxCount()); + assertEquals(1, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(45, stats.getTxSize().getBytes()); + serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY, + transportResponseHandler); + receivedLatch.await(); + stats = serviceC.transport.getStats(); // request has ben send + assertEquals(1, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + assertEquals(25, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + sendResponseLatch.countDown(); + responseLatch.await(); + stats = serviceC.transport.getStats(); // exception response has been received + assertEquals(2, stats.getRxCount()); + assertEquals(2, stats.getTxCount()); + int addressLen = serviceB.boundAddress().publishAddress().address().getAddress().getAddress().length; + // if we are bound to a IPv6 address the response address is serialized with the exception so it will be different depending + // on the stack. The emphemeral port will always be in the same range + assertEquals(185 + addressLen, stats.getRxSize().getBytes()); + assertEquals(91, stats.getTxSize().getBytes()); + } finally { + try { + assertPendingConnections(0, serviceC.getOriginalTransport()); + } finally { + serviceC.close(); + } + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 38a1701a7e1..94f5351cae7 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -248,7 +248,7 @@ public class MockTcpTransport extends TcpTransport } @Override - public long serverOpen() { + public long getNumOpenServerConnections() { return 1; } @@ -306,7 +306,9 @@ public class MockTcpTransport extends TcpTransport configureSocket(incomingSocket); synchronized (this) { if (isOpen.get()) { - incomingChannel = new MockChannel(incomingSocket, localAddress, profile, workerChannels::remove); + incomingChannel = new MockChannel(incomingSocket, + new InetSocketAddress(incomingSocket.getLocalAddress(), incomingSocket.getPort()), profile, + workerChannels::remove); //establish a happens-before edge between closing and accepting a new connection workerChannels.add(incomingChannel);