add transport info and stats to node info / stats api

This commit is contained in:
kimchy 2010-05-15 19:49:27 +03:00
parent b816ed5cf2
commit a005dc2c1f
13 changed files with 317 additions and 11 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.monitor.network.NetworkInfo;
import org.elasticsearch.monitor.os.OsInfo; import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo; import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo; import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;
import org.elasticsearch.util.collect.ImmutableMap; import org.elasticsearch.util.collect.ImmutableMap;
import org.elasticsearch.util.io.stream.StreamInput; import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput; import org.elasticsearch.util.io.stream.StreamOutput;
@ -56,11 +57,14 @@ public class NodeInfo extends NodeOperationResponse {
private ThreadPoolInfo threadPool; private ThreadPoolInfo threadPool;
private TransportInfo transport;
NodeInfo() { NodeInfo() {
} }
public NodeInfo(DiscoveryNode node, ImmutableMap<String, String> attributes, Settings settings, public NodeInfo(DiscoveryNode node, ImmutableMap<String, String> attributes, Settings settings,
OsInfo os, ProcessInfo process, JvmInfo jvm, NetworkInfo network, ThreadPoolInfo threadPool) { OsInfo os, ProcessInfo process, JvmInfo jvm, NetworkInfo network, ThreadPoolInfo threadPool,
TransportInfo transport) {
super(node); super(node);
this.attributes = attributes; this.attributes = attributes;
this.settings = settings; this.settings = settings;
@ -69,6 +73,7 @@ public class NodeInfo extends NodeOperationResponse {
this.jvm = jvm; this.jvm = jvm;
this.network = network; this.network = network;
this.threadPool = threadPool; this.threadPool = threadPool;
this.transport = transport;
} }
/** /**
@ -169,6 +174,14 @@ public class NodeInfo extends NodeOperationResponse {
return threadPool(); return threadPool();
} }
public TransportInfo transport() {
return transport;
}
public TransportInfo getTransport() {
return transport();
}
public static NodeInfo readNodeInfo(StreamInput in) throws IOException { public static NodeInfo readNodeInfo(StreamInput in) throws IOException {
NodeInfo nodeInfo = new NodeInfo(); NodeInfo nodeInfo = new NodeInfo();
nodeInfo.readFrom(in); nodeInfo.readFrom(in);
@ -199,6 +212,9 @@ public class NodeInfo extends NodeOperationResponse {
if (in.readBoolean()) { if (in.readBoolean()) {
threadPool = ThreadPoolInfo.readThreadPoolInfo(in); threadPool = ThreadPoolInfo.readThreadPoolInfo(in);
} }
if (in.readBoolean()) {
transport = TransportInfo.readTransportInfo(in);
}
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
@ -239,5 +255,11 @@ public class NodeInfo extends NodeOperationResponse {
out.writeBoolean(true); out.writeBoolean(true);
threadPool.writeTo(out); threadPool.writeTo(out);
} }
if (transport == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
transport.writeTo(out);
}
} }
} }

View File

@ -100,7 +100,7 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction<Node
return new NodeInfo(clusterService.state().nodes().localNode(), nodeAttributes, settings, return new NodeInfo(clusterService.state().nodes().localNode(), nodeAttributes, settings,
monitorService.osService().info(), monitorService.processService().info(), monitorService.osService().info(), monitorService.processService().info(),
monitorService.jvmService().info(), monitorService.networkService().info(), monitorService.jvmService().info(), monitorService.networkService().info(),
threadPool.info()); threadPool.info(), transportService.info());
} }
@Override protected boolean accumulateExceptions() { @Override protected boolean accumulateExceptions() {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.monitor.network.NetworkStats;
import org.elasticsearch.monitor.os.OsStats; import org.elasticsearch.monitor.os.OsStats;
import org.elasticsearch.monitor.process.ProcessStats; import org.elasticsearch.monitor.process.ProcessStats;
import org.elasticsearch.threadpool.ThreadPoolStats; import org.elasticsearch.threadpool.ThreadPoolStats;
import org.elasticsearch.transport.TransportStats;
import org.elasticsearch.util.io.stream.StreamInput; import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput; import org.elasticsearch.util.io.stream.StreamOutput;
@ -48,18 +49,21 @@ public class NodeStats extends NodeOperationResponse {
private ThreadPoolStats threadPool; private ThreadPoolStats threadPool;
private TransportStats transport;
NodeStats() { NodeStats() {
} }
public NodeStats(DiscoveryNode node, public NodeStats(DiscoveryNode node,
OsStats os, ProcessStats process, JvmStats jvm, NetworkStats network, OsStats os, ProcessStats process, JvmStats jvm, NetworkStats network,
ThreadPoolStats threadPool) { ThreadPoolStats threadPool, TransportStats transport) {
super(node); super(node);
this.os = os; this.os = os;
this.process = process; this.process = process;
this.jvm = jvm; this.jvm = jvm;
this.network = network; this.network = network;
this.threadPool = threadPool; this.threadPool = threadPool;
this.transport = transport;
} }
/** /**
@ -132,6 +136,14 @@ public class NodeStats extends NodeOperationResponse {
return threadPool(); return threadPool();
} }
public TransportStats transport() {
return transport;
}
public TransportStats getTransport() {
return transport();
}
public static NodeStats readNodeStats(StreamInput in) throws IOException { public static NodeStats readNodeStats(StreamInput in) throws IOException {
NodeStats nodeInfo = new NodeStats(); NodeStats nodeInfo = new NodeStats();
nodeInfo.readFrom(in); nodeInfo.readFrom(in);
@ -155,6 +167,9 @@ public class NodeStats extends NodeOperationResponse {
if (in.readBoolean()) { if (in.readBoolean()) {
threadPool = ThreadPoolStats.readThreadPoolStats(in); threadPool = ThreadPoolStats.readThreadPoolStats(in);
} }
if (in.readBoolean()) {
transport = TransportStats.readTransportStats(in);
}
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
@ -189,5 +204,11 @@ public class NodeStats extends NodeOperationResponse {
out.writeBoolean(true); out.writeBoolean(true);
threadPool.writeTo(out); threadPool.writeTo(out);
} }
if (transport == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
transport.writeTo(out);
}
} }
} }

View File

@ -88,7 +88,7 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
return new NodeStats(clusterService.state().nodes().localNode(), return new NodeStats(clusterService.state().nodes().localNode(),
monitorService.osService().stats(), monitorService.processService().stats(), monitorService.osService().stats(), monitorService.processService().stats(),
monitorService.jvmService().stats(), monitorService.networkService().stats(), monitorService.jvmService().stats(), monitorService.networkService().stats(),
threadPool.stats()); threadPool.stats(), transportService.stats());
} }
@Override protected boolean accumulateExceptions() { @Override protected boolean accumulateExceptions() {

View File

@ -104,6 +104,9 @@ public class RestNodesInfoAction extends BaseRestHandler {
if (nodeInfo.threadPool() != null) { if (nodeInfo.threadPool() != null) {
nodeInfo.threadPool().toXContent(builder, request); nodeInfo.threadPool().toXContent(builder, request);
} }
if (nodeInfo.transport() != null) {
nodeInfo.transport().toXContent(builder, request);
}
builder.endObject(); builder.endObject();
} }

View File

@ -76,6 +76,9 @@ public class RestNodesStatsAction extends BaseRestHandler {
if (nodeStats.threadPool() != null) { if (nodeStats.threadPool() != null) {
nodeStats.threadPool().toXContent(builder, request); nodeStats.threadPool().toXContent(builder, request);
} }
if (nodeStats.transport() != null) {
nodeStats.transport().toXContent(builder, request);
}
builder.endObject(); builder.endObject();
} }

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.transport.BoundTransportAddress;
import org.elasticsearch.util.xcontent.ToXContent;
import org.elasticsearch.util.xcontent.builder.XContentBuilder;
import java.io.IOException;
import java.io.Serializable;
/**
* @author kimchy (shay.banon)
*/
public class TransportInfo implements Streamable, Serializable, ToXContent {
private BoundTransportAddress address;
TransportInfo() {
}
public TransportInfo(BoundTransportAddress address) {
this.address = address;
}
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("transport");
builder.field("bound_address", address.boundAddress().toString());
builder.field("publish_address", address.publishAddress().toString());
builder.endObject();
}
public static TransportInfo readTransportInfo(StreamInput in) throws IOException {
TransportInfo info = new TransportInfo();
info.readFrom(in);
return info;
}
@Override public void readFrom(StreamInput in) throws IOException {
address = BoundTransportAddress.readBoundTransportAddress(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
address.writeTo(out);
}
public BoundTransportAddress address() {
return address;
}
public BoundTransportAddress getAddress() {
return address();
}
}

View File

@ -64,6 +64,14 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
final CopyOnWriteArrayList<TransportConnectionListener> connectionListeners = new CopyOnWriteArrayList<TransportConnectionListener>(); final CopyOnWriteArrayList<TransportConnectionListener> connectionListeners = new CopyOnWriteArrayList<TransportConnectionListener>();
final AtomicLong rxBytes = new AtomicLong();
final AtomicLong rxCount = new AtomicLong();
final AtomicLong txBytes = new AtomicLong();
final AtomicLong txCount = new AtomicLong();
// An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they // An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they
// do show up, we can print more descriptive information about them // do show up, we can print more descriptive information about them
final Map<Long, TimeoutInfoHolder> timeoutInfoHandlers = Collections.synchronizedMap(new LinkedHashMap<Long, TimeoutInfoHolder>(100, .75F, true) { final Map<Long, TimeoutInfoHolder> timeoutInfoHandlers = Collections.synchronizedMap(new LinkedHashMap<Long, TimeoutInfoHolder>(100, .75F, true) {
@ -106,6 +114,14 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return transport.addressSupported(address); return transport.addressSupported(address);
} }
public TransportInfo info() {
return new TransportInfo(boundAddress());
}
public TransportStats stats() {
return new TransportStats(rxCount.get(), rxBytes.get(), txCount.get(), txBytes.get());
}
public BoundTransportAddress boundAddress() { public BoundTransportAddress boundAddress() {
return transport.boundAddress(); return transport.boundAddress();
} }
@ -211,6 +227,17 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
} }
class Adapter implements TransportServiceAdapter { class Adapter implements TransportServiceAdapter {
@Override public void received(long size) {
rxCount.getAndIncrement();
rxBytes.addAndGet(size);
}
@Override public void sent(long size) {
txCount.getAndIncrement();
txBytes.addAndGet(size);
}
@Override public TransportRequestHandler handler(String action) { @Override public TransportRequestHandler handler(String action) {
return serverHandlers.get(action); return serverHandlers.get(action);
} }

View File

@ -33,4 +33,8 @@ public interface TransportServiceAdapter {
void raiseNodeConnected(DiscoveryNode node); void raiseNodeConnected(DiscoveryNode node);
void raiseNodeDisconnected(DiscoveryNode node); void raiseNodeDisconnected(DiscoveryNode node);
void received(long size);
void sent(long size);
} }

View File

@ -0,0 +1,117 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.xcontent.ToXContent;
import org.elasticsearch.util.xcontent.builder.XContentBuilder;
import java.io.IOException;
import java.io.Serializable;
/**
* @author kimchy (shay.banon)
*/
public class TransportStats implements Streamable, Serializable, ToXContent {
private long rxCount;
private long rxSize;
private long txCount;
private long txSize;
TransportStats() {
}
public TransportStats(long rxCount, long rxSize, long txCount, long txSize) {
this.rxCount = rxCount;
this.rxSize = rxSize;
this.txCount = txCount;
this.txSize = txSize;
}
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("transport");
builder.field("rx_count", rxCount);
builder.field("rx_size", rxSize().toString());
builder.field("rx_size_in_bytes", rxSize);
builder.field("tx_count", txCount);
builder.field("tx_size", txSize().toString());
builder.field("tx_size_in_bytes", txSize);
builder.endObject();
}
public static TransportStats readTransportStats(StreamInput in) throws IOException {
TransportStats stats = new TransportStats();
stats.readFrom(in);
return stats;
}
@Override public void readFrom(StreamInput in) throws IOException {
rxCount = in.readVLong();
rxSize = in.readVLong();
txCount = in.readVLong();
txSize = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(rxCount);
out.writeVLong(rxSize);
out.writeVLong(txCount);
out.writeVLong(txSize);
}
public long rxCount() {
return rxCount;
}
public long getRxCount() {
return rxCount();
}
public SizeValue rxSize() {
return new SizeValue(rxSize);
}
public SizeValue getRxSize() {
return rxSize();
}
public long txCount() {
return txCount;
}
public long getTxCount() {
return txCount();
}
public SizeValue txSize() {
return new SizeValue(txSize);
}
public SizeValue getTxSize() {
return txSize();
}
}

View File

@ -152,6 +152,9 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
} }
final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
transportServiceAdapter.sent(data.length);
threadPool.execute(new Runnable() { threadPool.execute(new Runnable() {
@Override public void run() { @Override public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, handler); targetTransport.messageReceived(data, action, LocalTransport.this, handler);
@ -164,6 +167,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
} }
void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final TransportResponseHandler responseHandler) { void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final TransportResponseHandler responseHandler) {
transportServiceAdapter.received(data.length);
StreamInput stream = new BytesStreamInput(data); StreamInput stream = new BytesStreamInput(data);
stream = HandlesStreamInput.Cached.cached(stream); stream = HandlesStreamInput.Cached.cached(stream);

View File

@ -27,10 +27,7 @@ import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.Streamable; import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.logging.ESLogger; import org.elasticsearch.util.logging.ESLogger;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.*;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import java.io.IOException; import java.io.IOException;
@ -56,10 +53,18 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
this.logger = logger; this.logger = logger;
} }
@Override public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
transportServiceAdapter.sent(e.getWrittenAmount());
super.writeComplete(ctx, e);
}
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
int size = buffer.getInt(buffer.readerIndex() - 4); int size = buffer.getInt(buffer.readerIndex() - 4);
transportServiceAdapter.received(size);
int markedReaderIndex = buffer.readerIndex(); int markedReaderIndex = buffer.readerIndex();
int expectedIndexReader = markedReaderIndex + size; int expectedIndexReader = markedReaderIndex + size;

View File

@ -19,6 +19,12 @@
package org.elasticsearch.util.transport; package org.elasticsearch.util.transport;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import java.io.IOException;
/** /**
* A bounded transport address is a tuple of two {@link TransportAddress}, one that represents * A bounded transport address is a tuple of two {@link TransportAddress}, one that represents
* the address the transport is bounded on, the the published one represents the one clients should * the address the transport is bounded on, the the published one represents the one clients should
@ -26,11 +32,14 @@ package org.elasticsearch.util.transport;
* *
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class BoundTransportAddress { public class BoundTransportAddress implements Streamable {
private final TransportAddress boundAddress; private TransportAddress boundAddress;
private final TransportAddress publishAddress; private TransportAddress publishAddress;
BoundTransportAddress() {
}
public BoundTransportAddress(TransportAddress boundAddress, TransportAddress publishAddress) { public BoundTransportAddress(TransportAddress boundAddress, TransportAddress publishAddress) {
this.boundAddress = boundAddress; this.boundAddress = boundAddress;
@ -45,6 +54,22 @@ public class BoundTransportAddress {
return publishAddress; return publishAddress;
} }
public static BoundTransportAddress readBoundTransportAddress(StreamInput in) throws IOException {
BoundTransportAddress addr = new BoundTransportAddress();
addr.readFrom(in);
return addr;
}
@Override public void readFrom(StreamInput in) throws IOException {
boundAddress = TransportAddressSerializers.addressFromStream(in);
publishAddress = TransportAddressSerializers.addressFromStream(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
TransportAddressSerializers.addressToStream(out, boundAddress);
TransportAddressSerializers.addressToStream(out, publishAddress);
}
@Override public String toString() { @Override public String toString() {
return "bound_address[" + boundAddress + "], publish_address[" + publishAddress + "]"; return "bound_address[" + boundAddress + "], publish_address[" + publishAddress + "]";
} }