Move TransportStats accounting into TcpTransport (#25251)

Today TcpTransport is the de-facto base-class for transport implementations.
The need for all the callbacks we have in TransportServiceAdaptor are not necessary
anymore since we can simply have the logic inside the base class itself. This change
moves the stats metrics directly into TcpTransport removing the need for low level
bytes send / received callbacks.
This commit is contained in:
Simon Willnauer 2017-06-16 22:34:11 +02:00 committed by GitHub
parent ecc87f613f
commit f18b0d293c
13 changed files with 280 additions and 85 deletions

View File

@ -50,6 +50,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
@ -181,6 +182,9 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
private final CounterMetric numHandshakes = new CounterMetric();
private static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";
private final MeanMetric readBytesMetric = new MeanMetric();
private final MeanMetric transmittedBytesMetric = new MeanMetric();
public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
@ -300,14 +304,14 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
DiscoveryNode node = entry.getKey();
NodeChannels channels = entry.getValue();
for (Channel channel : channels.getChannels()) {
internalSendMessage(channel, pingHeader, new NotifyOnceListener<Channel>() {
internalSendMessage(channel, pingHeader, new SendMetricListener<Channel>(pingHeader.length()) {
@Override
public void innerOnResponse(Channel channel) {
protected void innerInnerOnResponse(Channel channel) {
successfulPings.inc();
}
@Override
public void innerOnFailure(Exception e) {
protected void innerOnFailure(Exception e) {
if (isOpen(channel)) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage("[{}] failed to send ping transport message", node), e);
@ -984,9 +988,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
} else if (e instanceof TcpTransport.HttpOnTransportException) {
// in case we are able to return data, serialize the exception content and sent it back to the client
if (isOpen(channel)) {
final NotifyOnceListener<Channel> closeChannel = new NotifyOnceListener<Channel>() {
BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8));
final SendMetricListener<Channel> closeChannel = new SendMetricListener<Channel>(message.length()) {
@Override
public void innerOnResponse(Channel channel) {
protected void innerInnerOnResponse(Channel channel) {
try {
closeChannels(Collections.singletonList(channel));
} catch (IOException e1) {
@ -995,7 +1000,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
@Override
public void innerOnFailure(Exception e) {
protected void innerOnFailure(Exception e) {
try {
closeChannels(Collections.singletonList(channel));
} catch (IOException e1) {
@ -1004,7 +1009,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
};
internalSendMessage(channel, new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)), closeChannel);
internalSendMessage(channel, message, closeChannel);
}
} else {
logger.warn(
@ -1086,7 +1091,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
final TransportRequestOptions finalOptions = options;
// this might be called in a different thread
SendListener onRequestSent = new SendListener(stream,
() -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions));
() -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions), message.length());
internalSendMessage(targetChannel, message, onRequestSent);
addedReleaseListener = true;
} finally {
@ -1099,7 +1104,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
/**
* sends a message to the given channel, using the given callbacks.
*/
private void internalSendMessage(Channel targetChannel, BytesReference message, NotifyOnceListener<Channel> listener) {
private void internalSendMessage(Channel targetChannel, BytesReference message, SendMetricListener<Channel> listener) {
try {
sendMessage(targetChannel, message, listener);
} catch (Exception ex) {
@ -1131,9 +1136,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
status = TransportStatus.setError(status);
final BytesReference bytes = stream.bytes();
final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length());
CompositeBytesReference message = new CompositeBytesReference(header, bytes);
SendListener onResponseSent = new SendListener(null,
() -> transportServiceAdapter.onResponseSent(requestId, action, error));
internalSendMessage(channel, new CompositeBytesReference(header, bytes), onResponseSent);
() -> transportServiceAdapter.onResponseSent(requestId, action, error), message.length());
internalSendMessage(channel, message, onResponseSent);
}
}
@ -1162,13 +1168,13 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
threadPool.getThreadContext().writeTo(stream);
stream.setVersion(nodeVersion);
BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream);
BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream);
final TransportResponseOptions finalOptions = options;
// this might be called in a different thread
SendListener listener = new SendListener(stream,
() -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions));
internalSendMessage(channel, reference, listener);
() -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions), message.length());
internalSendMessage(channel, message, listener);
addedReleaseListener = true;
} finally {
if (!addedReleaseListener) {
@ -1324,7 +1330,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
public final void messageReceived(BytesReference reference, Channel channel, String profileName,
InetSocketAddress remoteAddress, int messageLengthBytes) throws IOException {
final int totalMessageSize = messageLengthBytes + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
transportServiceAdapter.addBytesReceived(totalMessageSize);
readBytesMetric.inc(totalMessageSize);
// we have additional bytes to read, outside of the header
boolean hasMessageBytesToRead = (totalMessageSize - TcpHeader.HEADER_SIZE) > 0;
StreamInput streamIn = reference.streamInput();
@ -1662,22 +1668,42 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
private final class SendListener extends NotifyOnceListener<Channel> {
/**
* This listener increments the transmitted bytes metric on success.
*/
private abstract class SendMetricListener<T> extends NotifyOnceListener<T> {
private final long messageSize;
private SendMetricListener(long messageSize) {
this.messageSize = messageSize;
}
@Override
protected final void innerOnResponse(T object) {
transmittedBytesMetric.inc(messageSize);
innerInnerOnResponse(object);
}
protected abstract void innerInnerOnResponse(T object);
}
private final class SendListener extends SendMetricListener<Channel> {
private final Releasable optionalReleasable;
private final Runnable transportAdaptorCallback;
private SendListener(Releasable optionalReleasable, Runnable transportAdaptorCallback) {
private SendListener(Releasable optionalReleasable, Runnable transportAdaptorCallback, long messageLength) {
super(messageLength);
this.optionalReleasable = optionalReleasable;
this.transportAdaptorCallback = transportAdaptorCallback;
}
@Override
public void innerOnResponse(Channel channel) {
protected void innerInnerOnResponse(Channel channel) {
release();
}
@Override
public void innerOnFailure(Exception e) {
protected void innerOnFailure(Exception e) {
release();
}
@ -1701,4 +1727,16 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
final int getNumConnectedNodes() {
return connectedNodes.size();
}
/**
* Returns count of currently open connections
*/
protected abstract long getNumOpenServerConnections();
@Override
public final TransportStats getStats() {
return new TransportStats(
getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(),
transmittedBytesMetric.sum());
}
}

View File

@ -75,11 +75,6 @@ public interface Transport extends LifecycleComponent {
*/
void disconnectFromNode(DiscoveryNode node);
/**
* Returns count of currently open connections
*/
long serverOpen();
List<String> getLocalAddresses();
default CircuitBreaker getInFlightRequestBreaker() {
@ -110,6 +105,8 @@ public interface Transport extends LifecycleComponent {
*/
Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException;
TransportStats getStats();
/**
* A unidirectional connection to a {@link DiscoveryNode}
*/

View File

@ -203,8 +203,6 @@ public class TransportService extends AbstractLifecycleComponent {
@Override
protected void doStart() {
adapter.rxMetric.clear();
adapter.txMetric.clear();
transport.transportServiceAdapter(adapter);
transport.start();
@ -292,8 +290,7 @@ public class TransportService extends AbstractLifecycleComponent {
}
public TransportStats stats() {
return new TransportStats(
transport.serverOpen(), adapter.rxMetric.count(), adapter.rxMetric.sum(), adapter.txMetric.count(), adapter.txMetric.sum());
return transport.getStats();
}
public BoundTransportAddress boundAddress() {
@ -738,19 +735,6 @@ public class TransportService extends AbstractLifecycleComponent {
protected class Adapter implements TransportServiceAdapter {
final MeanMetric rxMetric = new MeanMetric();
final MeanMetric txMetric = new MeanMetric();
@Override
public void addBytesReceived(long size) {
rxMetric.inc(size);
}
@Override
public void addBytesSent(long size) {
txMetric.inc(size);
}
@Override
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions options) {

View File

@ -23,10 +23,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
public interface TransportServiceAdapter extends TransportConnectionListener {
void addBytesReceived(long size);
void addBytesSent(long size);
/** called by the {@link Transport} implementation once a request has been sent */
void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options);

View File

@ -41,6 +41,7 @@ import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
import java.net.UnknownHostException;
@ -193,11 +194,6 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
}
@Override
public long serverOpen() {
return 0;
}
@Override
public Lifecycle.State lifecycleState() {
return null;
@ -231,4 +227,9 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
public long newRequestId() {
return requestId.incrementAndGet();
}
@Override
public TransportStats getStats() {
throw new UnsupportedOperationException();
}
}

View File

@ -41,6 +41,7 @@ import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.TransportStats;
import org.junit.After;
import org.junit.Before;
@ -241,11 +242,6 @@ public class NodeConnectionsServiceTests extends ESTestCase {
return getConnection(node);
}
@Override
public long serverOpen() {
return 0;
}
@Override
public List<String> getLocalAddresses() {
return null;
@ -263,12 +259,10 @@ public class NodeConnectionsServiceTests extends ESTestCase {
@Override
public void addLifecycleListener(LifecycleListener listener) {
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
}
@Override
@ -279,5 +273,10 @@ public class NodeConnectionsServiceTests extends ESTestCase {
@Override
public void close() {}
@Override
public TransportStats getStats() {
throw new UnsupportedOperationException();
}
}
}

View File

@ -235,7 +235,7 @@ public class TCPTransportTests extends ESTestCase {
}
@Override
public long serverOpen() {
public long getNumOpenServerConnections() {
return 0;
}

View File

@ -22,11 +22,8 @@ package org.elasticsearch.transport.netty4;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.TcpHeader;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.Transports;
import java.net.InetSocketAddress;
@ -37,25 +34,14 @@ import java.net.InetSocketAddress;
*/
final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
private final TransportServiceAdapter transportServiceAdapter;
private final Netty4Transport transport;
private final String profileName;
Netty4MessageChannelHandler(Netty4Transport transport, String profileName) {
this.transportServiceAdapter = transport.transportServiceAdapter();
this.transport = transport;
this.profileName = profileName;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf && transportServiceAdapter != null) {
// record the number of bytes send on the channel
promise.addListener(f -> transportServiceAdapter.addBytesSent(((ByteBuf) msg).readableBytes()));
}
ctx.write(msg, promise);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Transports.assertTransportThread();

View File

@ -306,7 +306,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
}
@Override
public long serverOpen() {
public long getNumOpenServerConnections() {
Netty4OpenChannelsHandler channels = serverOpenChannels;
return channels == null ? 0 : channels.numberOfOpenChannels();
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -213,6 +214,11 @@ public class CapturingTransport implements Transport {
};
}
@Override
public TransportStats getStats() {
throw new UnsupportedOperationException();
}
@Override
public void transportServiceAdapter(TransportServiceAdapter adapter) {
this.adapter = adapter;
@ -250,11 +256,6 @@ public class CapturingTransport implements Transport {
}
@Override
public long serverOpen() {
return 0;
}
@Override
public Lifecycle.State lifecycleState() {
return null;

View File

@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
import java.net.UnknownHostException;
@ -572,11 +573,6 @@ public final class MockTransportService extends TransportService {
transport.disconnectFromNode(node);
}
@Override
public long serverOpen() {
return transport.serverOpen();
}
@Override
public List<String> getLocalAddresses() {
return transport.getLocalAddresses();
@ -609,6 +605,11 @@ public final class MockTransportService extends TransportService {
};
}
@Override
public TransportStats getStats() {
return transport.getStats();
}
@Override
public Lifecycle.State lifecycleState() {
return transport.lifecycleState();

View File

@ -2252,4 +2252,194 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
assertPendingConnections(0, serviceC.getOriginalTransport());
}
public void testTransportStats() throws IOException, InterruptedException {
MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
CountDownLatch receivedLatch = new CountDownLatch(1);
CountDownLatch sendResponseLatch = new CountDownLatch(1);
serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
// don't block on a network thread here
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
@Override
protected void doRun() throws Exception {
receivedLatch.countDown();
sendResponseLatch.await();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
});
});
serviceC.start();
serviceC.acceptIncomingRequests();
CountDownLatch responseLatch = new CountDownLatch(1);
TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
@Override
public TransportResponse newInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
public void handleResponse(TransportResponse response) {
responseLatch.countDown();
}
@Override
public void handleException(TransportException exp) {
responseLatch.countDown();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
};
TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet
assertEquals(0, stats.getRxCount());
assertEquals(0, stats.getTxCount());
assertEquals(0, stats.getRxSize().getBytes());
assertEquals(0, stats.getTxSize().getBytes());
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);
try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) {
stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake
assertEquals(1, stats.getRxCount());
assertEquals(1, stats.getTxCount());
assertEquals(25, stats.getRxSize().getBytes());
assertEquals(45, stats.getTxSize().getBytes());
serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY,
transportResponseHandler);
receivedLatch.await();
stats = serviceC.transport.getStats(); // request has ben send
assertEquals(1, stats.getRxCount());
assertEquals(2, stats.getTxCount());
assertEquals(25, stats.getRxSize().getBytes());
assertEquals(91, stats.getTxSize().getBytes());
sendResponseLatch.countDown();
responseLatch.await();
stats = serviceC.transport.getStats(); // response has been received
assertEquals(2, stats.getRxCount());
assertEquals(2, stats.getTxCount());
assertEquals(46, stats.getRxSize().getBytes());
assertEquals(91, stats.getTxSize().getBytes());
} finally {
try {
assertPendingConnections(0, serviceC.getOriginalTransport());
} finally {
serviceC.close();
}
}
}
public void testTransportStatsWithException() throws IOException, InterruptedException {
MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
CountDownLatch receivedLatch = new CountDownLatch(1);
CountDownLatch sendResponseLatch = new CountDownLatch(1);
Exception ex = new RuntimeException("boom");
ex.setStackTrace(new StackTraceElement[0]);
serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
// don't block on a network thread here
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}
@Override
protected void doRun() throws Exception {
receivedLatch.countDown();
sendResponseLatch.await();
onFailure(ex);
}
});
});
serviceC.start();
serviceC.acceptIncomingRequests();
CountDownLatch responseLatch = new CountDownLatch(1);
TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
@Override
public TransportResponse newInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
public void handleResponse(TransportResponse response) {
responseLatch.countDown();
}
@Override
public void handleException(TransportException exp) {
responseLatch.countDown();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
};
TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet
assertEquals(0, stats.getRxCount());
assertEquals(0, stats.getTxCount());
assertEquals(0, stats.getRxSize().getBytes());
assertEquals(0, stats.getTxSize().getBytes());
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);
try (Transport.Connection connection = serviceC.openConnection(serviceB.getLocalNode(), builder.build())) {
stats = serviceC.transport.getStats(); // we did a single round-trip to do the initial handshake
assertEquals(1, stats.getRxCount());
assertEquals(1, stats.getTxCount());
assertEquals(25, stats.getRxSize().getBytes());
assertEquals(45, stats.getTxSize().getBytes());
serviceC.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY,
transportResponseHandler);
receivedLatch.await();
stats = serviceC.transport.getStats(); // request has ben send
assertEquals(1, stats.getRxCount());
assertEquals(2, stats.getTxCount());
assertEquals(25, stats.getRxSize().getBytes());
assertEquals(91, stats.getTxSize().getBytes());
sendResponseLatch.countDown();
responseLatch.await();
stats = serviceC.transport.getStats(); // exception response has been received
assertEquals(2, stats.getRxCount());
assertEquals(2, stats.getTxCount());
int addressLen = serviceB.boundAddress().publishAddress().address().getAddress().getAddress().length;
// if we are bound to a IPv6 address the response address is serialized with the exception so it will be different depending
// on the stack. The emphemeral port will always be in the same range
assertEquals(185 + addressLen, stats.getRxSize().getBytes());
assertEquals(91, stats.getTxSize().getBytes());
} finally {
try {
assertPendingConnections(0, serviceC.getOriginalTransport());
} finally {
serviceC.close();
}
}
}
}

View File

@ -248,7 +248,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
}
@Override
public long serverOpen() {
public long getNumOpenServerConnections() {
return 1;
}
@ -306,7 +306,9 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
configureSocket(incomingSocket);
synchronized (this) {
if (isOpen.get()) {
incomingChannel = new MockChannel(incomingSocket, localAddress, profile, workerChannels::remove);
incomingChannel = new MockChannel(incomingSocket,
new InetSocketAddress(incomingSocket.getLocalAddress(), incomingSocket.getPort()), profile,
workerChannels::remove);
//establish a happens-before edge between closing and accepting a new connection
workerChannels.add(incomingChannel);