From 31483e4a921f122e09715bc94d09c7781fb730d6 Mon Sep 17 00:00:00 2001 From: kimchy Date: Mon, 30 May 2011 12:53:03 +0300 Subject: [PATCH] Node Stats: Remove low level transport stats from response, closes #979. --- .../admin/cluster/node/stats/NodeStats.java | 24 +--- .../node/stats/TransportNodesStatsAction.java | 3 +- .../node/stats/RestNodesStatsAction.java | 11 +- .../transport/TransportService.java | 19 --- .../transport/TransportServiceAdapter.java | 4 - .../transport/TransportStats.java | 118 ------------------ .../transport/local/LocalTransport.java | 12 +- .../netty/MessageChannelHandler.java | 20 +-- 8 files changed, 27 insertions(+), 184 deletions(-) delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportStats.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index ce1bf381042..64267a60128 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -28,7 +28,6 @@ import org.elasticsearch.monitor.jvm.JvmStats; import org.elasticsearch.monitor.network.NetworkStats; import org.elasticsearch.monitor.os.OsStats; import org.elasticsearch.monitor.process.ProcessStats; -import org.elasticsearch.transport.TransportStats; import java.io.IOException; @@ -49,21 +48,17 @@ public class NodeStats extends NodeOperationResponse { private NetworkStats network; - private TransportStats transport; - NodeStats() { } public NodeStats(DiscoveryNode node, NodeIndicesStats indices, - OsStats os, ProcessStats process, JvmStats jvm, NetworkStats network, - TransportStats transport) { + OsStats os, ProcessStats process, JvmStats jvm, NetworkStats network) { super(node); this.indices = indices; this.os = os; this.process = process; this.jvm = jvm; this.network = network; - this.transport = transport; } /** @@ -136,14 +131,6 @@ public class NodeStats extends NodeOperationResponse { return network(); } - public TransportStats transport() { - return transport; - } - - public TransportStats getTransport() { - return transport(); - } - public static NodeStats readNodeStats(StreamInput in) throws IOException { NodeStats nodeInfo = new NodeStats(); nodeInfo.readFrom(in); @@ -167,9 +154,6 @@ public class NodeStats extends NodeOperationResponse { if (in.readBoolean()) { network = NetworkStats.readNetworkStats(in); } - if (in.readBoolean()) { - transport = TransportStats.readTransportStats(in); - } } @Override public void writeTo(StreamOutput out) throws IOException { @@ -204,11 +188,5 @@ public class NodeStats extends NodeOperationResponse { out.writeBoolean(true); network.writeTo(out); } - if (transport == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - transport.writeTo(out); - } } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index af7d176a167..58b29680ec4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -95,8 +95,7 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction connectionListeners = new CopyOnWriteArrayList(); - final AtomicLong rxBytes = new AtomicLong(); - final AtomicLong rxCount = new AtomicLong(); - final AtomicLong txBytes = new AtomicLong(); - final AtomicLong txCount = new AtomicLong(); - // An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they // do show up, we can print more descriptive information about them final Map timeoutInfoHandlers = Collections.synchronizedMap(new LinkedHashMap(100, .75F, true) { @@ -110,10 +105,6 @@ public class TransportService extends AbstractLifecycleComponent implem final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); - transportServiceAdapter.sent(data.length); - threadPool.cached().execute(new Runnable() { @Override public void run() { targetTransport.messageReceived(data, action, LocalTransport.this, requestId); @@ -171,8 +175,6 @@ public class LocalTransport extends AbstractLifecycleComponent implem } void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final Long sendRequestId) { - transportServiceAdapter.received(data.length); - StreamInput stream = new BytesStreamInput(data); stream = CachedStreamInput.cachedHandles(stream); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 9cef4537173..6a9b49dceac 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -26,9 +26,18 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.netty.buffer.ChannelBuffer; -import org.elasticsearch.common.netty.channel.*; +import org.elasticsearch.common.netty.channel.ChannelHandlerContext; +import org.elasticsearch.common.netty.channel.ExceptionEvent; +import org.elasticsearch.common.netty.channel.MessageEvent; +import org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.ResponseHandlerFailureTransportException; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportSerializationException; +import org.elasticsearch.transport.TransportServiceAdapter; import org.elasticsearch.transport.support.TransportStreams; import java.io.IOException; @@ -53,18 +62,11 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { this.logger = logger; } - @Override public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception { - transportServiceAdapter.sent(e.getWrittenAmount()); - super.writeComplete(ctx, e); - } - @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); int size = buffer.getInt(buffer.readerIndex() - 4); - transportServiceAdapter.received(size + 4); - int markedReaderIndex = buffer.readerIndex(); int expectedIndexReader = markedReaderIndex + size;