Node Stats: Remove low level transport stats from response, closes #979.

This commit is contained in:
kimchy 2011-05-30 12:53:03 +03:00
parent 358a4acf03
commit 31483e4a92
8 changed files with 27 additions and 184 deletions

View File

@ -28,7 +28,6 @@ import org.elasticsearch.monitor.jvm.JvmStats;
import org.elasticsearch.monitor.network.NetworkStats; import org.elasticsearch.monitor.network.NetworkStats;
import org.elasticsearch.monitor.os.OsStats; import org.elasticsearch.monitor.os.OsStats;
import org.elasticsearch.monitor.process.ProcessStats; import org.elasticsearch.monitor.process.ProcessStats;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException; import java.io.IOException;
@ -49,21 +48,17 @@ public class NodeStats extends NodeOperationResponse {
private NetworkStats network; private NetworkStats network;
private TransportStats transport;
NodeStats() { NodeStats() {
} }
public NodeStats(DiscoveryNode node, NodeIndicesStats indices, public NodeStats(DiscoveryNode node, NodeIndicesStats indices,
OsStats os, ProcessStats process, JvmStats jvm, NetworkStats network, OsStats os, ProcessStats process, JvmStats jvm, NetworkStats network) {
TransportStats transport) {
super(node); super(node);
this.indices = indices; this.indices = indices;
this.os = os; this.os = os;
this.process = process; this.process = process;
this.jvm = jvm; this.jvm = jvm;
this.network = network; this.network = network;
this.transport = transport;
} }
/** /**
@ -136,14 +131,6 @@ public class NodeStats extends NodeOperationResponse {
return network(); return network();
} }
public TransportStats transport() {
return transport;
}
public TransportStats getTransport() {
return transport();
}
public static NodeStats readNodeStats(StreamInput in) throws IOException { public static NodeStats readNodeStats(StreamInput in) throws IOException {
NodeStats nodeInfo = new NodeStats(); NodeStats nodeInfo = new NodeStats();
nodeInfo.readFrom(in); nodeInfo.readFrom(in);
@ -167,9 +154,6 @@ public class NodeStats extends NodeOperationResponse {
if (in.readBoolean()) { if (in.readBoolean()) {
network = NetworkStats.readNetworkStats(in); network = NetworkStats.readNetworkStats(in);
} }
if (in.readBoolean()) {
transport = TransportStats.readTransportStats(in);
}
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
@ -204,11 +188,5 @@ public class NodeStats extends NodeOperationResponse {
out.writeBoolean(true); out.writeBoolean(true);
network.writeTo(out); network.writeTo(out);
} }
if (transport == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
transport.writeTo(out);
}
} }
} }

View File

@ -95,8 +95,7 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
@Override protected NodeStats nodeOperation(NodeStatsRequest request) throws ElasticSearchException { @Override protected NodeStats nodeOperation(NodeStatsRequest request) throws ElasticSearchException {
return new NodeStats(clusterService.state().nodes().localNode(), indicesService.stats(), return new NodeStats(clusterService.state().nodes().localNode(), indicesService.stats(),
monitorService.osService().stats(), monitorService.processService().stats(), monitorService.osService().stats(), monitorService.processService().stats(),
monitorService.jvmService().stats(), monitorService.networkService().stats(), monitorService.jvmService().stats(), monitorService.networkService().stats());
transportService.stats());
} }
@Override protected boolean accumulateExceptions() { @Override protected boolean accumulateExceptions() {

View File

@ -27,7 +27,13 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.XContentRestResponse;
import org.elasticsearch.rest.XContentThrowableRestResponse;
import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestXContentBuilder; import org.elasticsearch.rest.action.support.RestXContentBuilder;
@ -77,9 +83,6 @@ public class RestNodesStatsAction extends BaseRestHandler {
if (nodeStats.network() != null) { if (nodeStats.network() != null) {
nodeStats.network().toXContent(builder, request); nodeStats.network().toXContent(builder, request);
} }
if (nodeStats.transport() != null) {
nodeStats.transport().toXContent(builder, request);
}
builder.endObject(); builder.endObject();
} }

View File

@ -60,11 +60,6 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
final CopyOnWriteArrayList<TransportConnectionListener> connectionListeners = new CopyOnWriteArrayList<TransportConnectionListener>(); final CopyOnWriteArrayList<TransportConnectionListener> connectionListeners = new CopyOnWriteArrayList<TransportConnectionListener>();
final AtomicLong rxBytes = new AtomicLong();
final AtomicLong rxCount = new AtomicLong();
final AtomicLong txBytes = new AtomicLong();
final AtomicLong txCount = new AtomicLong();
// An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they // An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they
// do show up, we can print more descriptive information about them // do show up, we can print more descriptive information about them
final Map<Long, TimeoutInfoHolder> timeoutInfoHandlers = Collections.synchronizedMap(new LinkedHashMap<Long, TimeoutInfoHolder>(100, .75F, true) { final Map<Long, TimeoutInfoHolder> timeoutInfoHandlers = Collections.synchronizedMap(new LinkedHashMap<Long, TimeoutInfoHolder>(100, .75F, true) {
@ -110,10 +105,6 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return new TransportInfo(boundAddress()); return new TransportInfo(boundAddress());
} }
public TransportStats stats() {
return new TransportStats(rxCount.get(), rxBytes.get(), txCount.get(), txBytes.get());
}
public BoundTransportAddress boundAddress() { public BoundTransportAddress boundAddress() {
return transport.boundAddress(); return transport.boundAddress();
} }
@ -230,16 +221,6 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
class Adapter implements TransportServiceAdapter { class Adapter implements TransportServiceAdapter {
@Override public void received(long size) {
rxCount.getAndIncrement();
rxBytes.addAndGet(size);
}
@Override public void sent(long size) {
txCount.getAndIncrement();
txBytes.addAndGet(size);
}
@Override public TransportRequestHandler handler(String action) { @Override public TransportRequestHandler handler(String action) {
return serverHandlers.get(action); return serverHandlers.get(action);
} }

View File

@ -33,8 +33,4 @@ public interface TransportServiceAdapter {
void raiseNodeConnected(DiscoveryNode node); void raiseNodeConnected(DiscoveryNode node);
void raiseNodeDisconnected(DiscoveryNode node); void raiseNodeDisconnected(DiscoveryNode node);
void received(long size);
void sent(long size);
} }

View File

@ -1,118 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.io.Serializable;
/**
* @author kimchy (shay.banon)
*/
public class TransportStats implements Streamable, Serializable, ToXContent {
private long rxCount;
private long rxSize;
private long txCount;
private long txSize;
TransportStats() {
}
public TransportStats(long rxCount, long rxSize, long txCount, long txSize) {
this.rxCount = rxCount;
this.rxSize = rxSize;
this.txCount = txCount;
this.txSize = txSize;
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("transport");
builder.field("rx_count", rxCount);
builder.field("rx_size", rxSize().toString());
builder.field("rx_size_in_bytes", rxSize);
builder.field("tx_count", txCount);
builder.field("tx_size", txSize().toString());
builder.field("tx_size_in_bytes", txSize);
builder.endObject();
return builder;
}
public static TransportStats readTransportStats(StreamInput in) throws IOException {
TransportStats stats = new TransportStats();
stats.readFrom(in);
return stats;
}
@Override public void readFrom(StreamInput in) throws IOException {
rxCount = in.readVLong();
rxSize = in.readVLong();
txCount = in.readVLong();
txSize = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(rxCount);
out.writeVLong(rxSize);
out.writeVLong(txCount);
out.writeVLong(txSize);
}
public long rxCount() {
return rxCount;
}
public long getRxCount() {
return rxCount();
}
public ByteSizeValue rxSize() {
return new ByteSizeValue(rxSize);
}
public ByteSizeValue getRxSize() {
return rxSize();
}
public long txCount() {
return txCount;
}
public long getTxCount() {
return txCount();
}
public ByteSizeValue txSize() {
return new ByteSizeValue(txSize);
}
public ByteSizeValue getTxSize() {
return txSize();
}
}

View File

@ -25,7 +25,13 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.ThrowableObjectInputStream; import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.*; import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.BoundTransportAddress;
@ -154,8 +160,6 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
transportServiceAdapter.sent(data.length);
threadPool.cached().execute(new Runnable() { threadPool.cached().execute(new Runnable() {
@Override public void run() { @Override public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, requestId); targetTransport.messageReceived(data, action, LocalTransport.this, requestId);
@ -171,8 +175,6 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
} }
void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final Long sendRequestId) { void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final Long sendRequestId) {
transportServiceAdapter.received(data.length);
StreamInput stream = new BytesStreamInput(data); StreamInput stream = new BytesStreamInput(data);
stream = CachedStreamInput.cachedHandles(stream); stream = CachedStreamInput.cachedHandles(stream);

View File

@ -26,9 +26,18 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.netty.buffer.ChannelBuffer; import org.elasticsearch.common.netty.buffer.ChannelBuffer;
import org.elasticsearch.common.netty.channel.*; import org.elasticsearch.common.netty.channel.ChannelHandlerContext;
import org.elasticsearch.common.netty.channel.ExceptionEvent;
import org.elasticsearch.common.netty.channel.MessageEvent;
import org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportSerializationException;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.support.TransportStreams; import org.elasticsearch.transport.support.TransportStreams;
import java.io.IOException; import java.io.IOException;
@ -53,18 +62,11 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
this.logger = logger; this.logger = logger;
} }
@Override public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
transportServiceAdapter.sent(e.getWrittenAmount());
super.writeComplete(ctx, e);
}
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
int size = buffer.getInt(buffer.readerIndex() - 4); int size = buffer.getInt(buffer.readerIndex() - 4);
transportServiceAdapter.received(size + 4);
int markedReaderIndex = buffer.readerIndex(); int markedReaderIndex = buffer.readerIndex();
int expectedIndexReader = markedReaderIndex + size; int expectedIndexReader = markedReaderIndex + size;