diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java index a4fa50a0086..c35e94ee7cd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java @@ -26,6 +26,7 @@ import org.elasticsearch.monitor.network.NetworkInfo; import org.elasticsearch.monitor.os.OsInfo; import org.elasticsearch.monitor.process.ProcessInfo; import org.elasticsearch.threadpool.ThreadPoolInfo; +import org.elasticsearch.transport.TransportInfo; import org.elasticsearch.util.collect.ImmutableMap; import org.elasticsearch.util.io.stream.StreamInput; import org.elasticsearch.util.io.stream.StreamOutput; @@ -56,11 +57,14 @@ public class NodeInfo extends NodeOperationResponse { private ThreadPoolInfo threadPool; + private TransportInfo transport; + NodeInfo() { } public NodeInfo(DiscoveryNode node, ImmutableMap attributes, Settings settings, - OsInfo os, ProcessInfo process, JvmInfo jvm, NetworkInfo network, ThreadPoolInfo threadPool) { + OsInfo os, ProcessInfo process, JvmInfo jvm, NetworkInfo network, ThreadPoolInfo threadPool, + TransportInfo transport) { super(node); this.attributes = attributes; this.settings = settings; @@ -69,6 +73,7 @@ public class NodeInfo extends NodeOperationResponse { this.jvm = jvm; this.network = network; this.threadPool = threadPool; + this.transport = transport; } /** @@ -169,6 +174,14 @@ public class NodeInfo extends NodeOperationResponse { return threadPool(); } + public TransportInfo transport() { + return transport; + } + + public TransportInfo getTransport() { + return transport(); + } + public static NodeInfo readNodeInfo(StreamInput in) throws IOException { NodeInfo nodeInfo = new NodeInfo(); nodeInfo.readFrom(in); @@ -199,6 +212,9 @@ public class NodeInfo extends NodeOperationResponse { if (in.readBoolean()) { threadPool = ThreadPoolInfo.readThreadPoolInfo(in); } + if (in.readBoolean()) { + transport = TransportInfo.readTransportInfo(in); + } } @Override public void writeTo(StreamOutput out) throws IOException { @@ -239,5 +255,11 @@ public class NodeInfo extends NodeOperationResponse { out.writeBoolean(true); threadPool.writeTo(out); } + if (transport == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + transport.writeTo(out); + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index d078432859e..18e8f5bc5f2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -100,7 +100,7 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction connectionListeners = new CopyOnWriteArrayList(); + final AtomicLong rxBytes = new AtomicLong(); + + final AtomicLong rxCount = new AtomicLong(); + + final AtomicLong txBytes = new AtomicLong(); + + final AtomicLong txCount = new AtomicLong(); + // An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they // do show up, we can print more descriptive information about them final Map timeoutInfoHandlers = Collections.synchronizedMap(new LinkedHashMap(100, .75F, true) { @@ -106,6 +114,14 @@ public class TransportService extends AbstractLifecycleComponent implem } final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); + + transportServiceAdapter.sent(data.length); + threadPool.execute(new Runnable() { @Override public void run() { targetTransport.messageReceived(data, action, LocalTransport.this, handler); @@ -164,6 +167,8 @@ public class LocalTransport extends AbstractLifecycleComponent implem } void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final TransportResponseHandler responseHandler) { + transportServiceAdapter.received(data.length); + StreamInput stream = new BytesStreamInput(data); stream = HandlesStreamInput.Cached.cached(stream); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 67b30321925..a7a89b2cdcf 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -27,10 +27,7 @@ import org.elasticsearch.util.io.stream.StreamInput; import org.elasticsearch.util.io.stream.Streamable; import org.elasticsearch.util.logging.ESLogger; import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.*; import java.io.IOException; @@ -56,10 +53,18 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { this.logger = logger; } + @Override public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception { + transportServiceAdapter.sent(e.getWrittenAmount()); + super.writeComplete(ctx, e); + } + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); int size = buffer.getInt(buffer.readerIndex() - 4); + + transportServiceAdapter.received(size); + int markedReaderIndex = buffer.readerIndex(); int expectedIndexReader = markedReaderIndex + size; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/BoundTransportAddress.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/BoundTransportAddress.java index b36e3724d10..2b21f1ba745 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/BoundTransportAddress.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/BoundTransportAddress.java @@ -19,6 +19,12 @@ package org.elasticsearch.util.transport; +import org.elasticsearch.util.io.stream.StreamInput; +import org.elasticsearch.util.io.stream.StreamOutput; +import org.elasticsearch.util.io.stream.Streamable; + +import java.io.IOException; + /** * A bounded transport address is a tuple of two {@link TransportAddress}, one that represents * the address the transport is bounded on, the the published one represents the one clients should @@ -26,11 +32,14 @@ package org.elasticsearch.util.transport; * * @author kimchy (shay.banon) */ -public class BoundTransportAddress { +public class BoundTransportAddress implements Streamable { - private final TransportAddress boundAddress; + private TransportAddress boundAddress; - private final TransportAddress publishAddress; + private TransportAddress publishAddress; + + BoundTransportAddress() { + } public BoundTransportAddress(TransportAddress boundAddress, TransportAddress publishAddress) { this.boundAddress = boundAddress; @@ -45,6 +54,22 @@ public class BoundTransportAddress { return publishAddress; } + public static BoundTransportAddress readBoundTransportAddress(StreamInput in) throws IOException { + BoundTransportAddress addr = new BoundTransportAddress(); + addr.readFrom(in); + return addr; + } + + @Override public void readFrom(StreamInput in) throws IOException { + boundAddress = TransportAddressSerializers.addressFromStream(in); + publishAddress = TransportAddressSerializers.addressFromStream(in); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + TransportAddressSerializers.addressToStream(out, boundAddress); + TransportAddressSerializers.addressToStream(out, publishAddress); + } + @Override public String toString() { return "bound_address[" + boundAddress + "], publish_address[" + publishAddress + "]"; }