Enhance transport stats to include rx and tx counters, closes #1254.

This commit is contained in:
Shay Banon 2011-08-20 21:37:37 +03:00
parent ef47308ead
commit 6afbfad089
15 changed files with 276 additions and 79 deletions

View File

@ -0,0 +1,65 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.counter;
import org.elasticsearch.common.StopWatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author kimchy (Shay Banon)
*/
public class SimpleCounterBenchmark {
private static long NUMBER_OF_ITERATIONS = 10000000;
private static int NUMBER_OF_THREADS = 100;
public static void main(String[] args) throws Exception {
final AtomicLong counter = new AtomicLong();
StopWatch stopWatch = new StopWatch().start();
System.out.println("Running " + NUMBER_OF_ITERATIONS);
for (long i = 0; i < NUMBER_OF_ITERATIONS; i++) {
counter.incrementAndGet();
}
System.out.println("Took " + stopWatch.stop().totalTime() + " TP Millis " + (NUMBER_OF_ITERATIONS / stopWatch.totalTime().millisFrac()));
System.out.println("Running using " + NUMBER_OF_THREADS + " threads with " + NUMBER_OF_ITERATIONS + " iterations");
final CountDownLatch latch = new CountDownLatch(NUMBER_OF_THREADS);
Thread[] threads = new Thread[NUMBER_OF_THREADS];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new Runnable() {
@Override public void run() {
for (long i = 0; i < NUMBER_OF_ITERATIONS; i++) {
counter.incrementAndGet();
}
latch.countDown();
}
});
}
stopWatch = new StopWatch().start();
for (Thread thread : threads) {
thread.start();
}
latch.await();
stopWatch.stop();
System.out.println("Took " + stopWatch.totalTime() + " TP Millis " + ((NUMBER_OF_ITERATIONS * NUMBER_OF_THREADS) / stopWatch.totalTime().millisFrac()));
}
}

View File

@ -25,13 +25,9 @@ import org.elasticsearch.action.support.nodes.NodeOperationRequest;
import org.elasticsearch.action.support.nodes.TransportNodesOperationAction; import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.HttpServer; import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -44,29 +40,13 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
*/ */
public class TransportNodesInfoAction extends TransportNodesOperationAction<NodesInfoRequest, NodesInfoResponse, TransportNodesInfoAction.NodeInfoRequest, NodeInfo> { public class TransportNodesInfoAction extends TransportNodesOperationAction<NodesInfoRequest, NodesInfoResponse, TransportNodesInfoAction.NodeInfoRequest, NodeInfo> {
private final MonitorService monitorService; private final NodeService nodeService;
private volatile ImmutableMap<String, String> nodeAttributes = ImmutableMap.of();
@Nullable private HttpServer httpServer;
@Inject public TransportNodesInfoAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, @Inject public TransportNodesInfoAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ClusterService clusterService, TransportService transportService,
MonitorService monitorService) { NodeService nodeService) {
super(settings, clusterName, threadPool, clusterService, transportService); super(settings, clusterName, threadPool, clusterService, transportService);
this.monitorService = monitorService; this.nodeService = nodeService;
}
public void setHttpServer(@Nullable HttpServer httpServer) {
this.httpServer = httpServer;
}
public synchronized void putNodeAttribute(String key, String value) {
nodeAttributes = new MapBuilder<String, String>().putAll(nodeAttributes).put(key, value).immutableMap();
}
public synchronized void removeNodeAttribute(String key) {
nodeAttributes = new MapBuilder<String, String>().putAll(nodeAttributes).remove(key).immutableMap();
} }
@Override protected String executor() { @Override protected String executor() {
@ -109,10 +89,7 @@ public class TransportNodesInfoAction extends TransportNodesOperationAction<Node
} }
@Override protected NodeInfo nodeOperation(NodeInfoRequest nodeInfoRequest) throws ElasticSearchException { @Override protected NodeInfo nodeOperation(NodeInfoRequest nodeInfoRequest) throws ElasticSearchException {
return new NodeInfo(clusterService.state().nodes().localNode(), nodeAttributes, settings, return nodeService.info();
monitorService.osService().info(), monitorService.processService().info(),
monitorService.jvmService().info(), monitorService.networkService().info(),
transportService.info(), httpServer == null ? null : httpServer.info());
} }
@Override protected boolean accumulateExceptions() { @Override protected boolean accumulateExceptions() {

View File

@ -25,13 +25,10 @@ import org.elasticsearch.action.support.nodes.NodeOperationRequest;
import org.elasticsearch.action.support.nodes.TransportNodesOperationAction; import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.HttpServer; import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -43,22 +40,13 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
*/ */
public class TransportNodesStatsAction extends TransportNodesOperationAction<NodesStatsRequest, NodesStatsResponse, TransportNodesStatsAction.NodeStatsRequest, NodeStats> { public class TransportNodesStatsAction extends TransportNodesOperationAction<NodesStatsRequest, NodesStatsResponse, TransportNodesStatsAction.NodeStatsRequest, NodeStats> {
private final MonitorService monitorService; private final NodeService nodeService;
private final IndicesService indicesService;
@Nullable private HttpServer httpServer;
@Inject public TransportNodesStatsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, @Inject public TransportNodesStatsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ClusterService clusterService, TransportService transportService,
MonitorService monitorService, IndicesService indicesService) { NodeService nodeService) {
super(settings, clusterName, threadPool, clusterService, transportService); super(settings, clusterName, threadPool, clusterService, transportService);
this.monitorService = monitorService; this.nodeService = nodeService;
this.indicesService = indicesService;
}
public void setHttpServer(@Nullable HttpServer httpServer) {
this.httpServer = httpServer;
} }
@Override protected String executor() { @Override protected String executor() {
@ -101,10 +89,7 @@ public class TransportNodesStatsAction extends TransportNodesOperationAction<Nod
} }
@Override protected NodeStats nodeOperation(NodeStatsRequest request) throws ElasticSearchException { @Override protected NodeStats nodeOperation(NodeStatsRequest request) throws ElasticSearchException {
return new NodeStats(clusterService.state().nodes().localNode(), indicesService.stats(), return nodeService.stats();
monitorService.osService().stats(), monitorService.processService().stats(),
monitorService.jvmService().stats(), monitorService.networkService().stats(),
transportService.stats(), httpServer == null ? null : httpServer.stats());
} }
@Override protected boolean accumulateExceptions() { @Override protected boolean accumulateExceptions() {

View File

@ -20,14 +20,13 @@
package org.elasticsearch.http; package org.elasticsearch.http;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
@ -51,21 +50,19 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
private final RestController restController; private final RestController restController;
private final TransportNodesInfoAction nodesInfoAction; private final NodeService nodeService;
private final boolean disableSites; private final boolean disableSites;
@Inject public HttpServer(Settings settings, Environment environment, HttpServerTransport transport, @Inject public HttpServer(Settings settings, Environment environment, HttpServerTransport transport,
RestController restController, RestController restController,
TransportNodesInfoAction nodesInfoAction, TransportNodesStatsAction nodesStatsAction) { NodeService nodeService) {
super(settings); super(settings);
this.environment = environment; this.environment = environment;
this.transport = transport; this.transport = transport;
this.restController = restController; this.restController = restController;
this.nodesInfoAction = nodesInfoAction; this.nodeService = nodeService;
this.nodesInfoAction.setHttpServer(this); nodeService.setHttpServer(this);
nodesStatsAction.setHttpServer(this);
this.disableSites = componentSettings.getAsBoolean("disable_sites", false); this.disableSites = componentSettings.getAsBoolean("disable_sites", false);
@ -90,11 +87,11 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("{}", transport.boundAddress()); logger.info("{}", transport.boundAddress());
} }
nodesInfoAction.putNodeAttribute("http_address", transport.boundAddress().publishAddress().toString()); nodeService.putNodeAttribute("http_address", transport.boundAddress().publishAddress().toString());
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
nodesInfoAction.removeNodeAttribute("http_address"); nodeService.removeNodeAttribute("http_address");
transport.stop(); transport.stop();
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.node.internal;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.node.settings.NodeSettingsService;
/** /**
@ -37,5 +38,6 @@ public class NodeModule extends AbstractModule {
@Override protected void configure() { @Override protected void configure() {
bind(Node.class).toInstance(node); bind(Node.class).toInstance(node);
bind(NodeSettingsService.class).asEagerSingleton(); bind(NodeSettingsService.class).asEagerSingleton();
bind(NodeService.class).asEagerSingleton();
} }
} }

View File

@ -0,0 +1,85 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.node.service;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.transport.TransportService;
/**
*/
public class NodeService extends AbstractComponent {
private final MonitorService monitorService;
private final ClusterService clusterService;
private final TransportService transportService;
private final IndicesService indicesService;
@Nullable private HttpServer httpServer;
private volatile ImmutableMap<String, String> nodeAttributes = ImmutableMap.of();
@Inject public NodeService(Settings settings, MonitorService monitorService, ClusterService clusterService, TransportService transportService, IndicesService indicesService) {
super(settings);
this.monitorService = monitorService;
this.clusterService = clusterService;
this.transportService = transportService;
this.indicesService = indicesService;
}
public void setHttpServer(@Nullable HttpServer httpServer) {
this.httpServer = httpServer;
}
public synchronized void putNodeAttribute(String key, String value) {
nodeAttributes = new MapBuilder<String, String>().putAll(nodeAttributes).put(key, value).immutableMap();
}
public synchronized void removeNodeAttribute(String key) {
nodeAttributes = new MapBuilder<String, String>().putAll(nodeAttributes).remove(key).immutableMap();
}
public NodeInfo info() {
return new NodeInfo(clusterService.state().nodes().localNode(), nodeAttributes, settings,
monitorService.osService().info(), monitorService.processService().info(),
monitorService.jvmService().info(), monitorService.networkService().info(),
transportService.info(), httpServer == null ? null : httpServer.info());
}
public NodeStats stats() {
return new NodeStats(clusterService.state().nodes().localNode(), indicesService.stats(),
monitorService.osService().stats(), monitorService.processService().stats(),
monitorService.jvmService().stats(), monitorService.networkService().stats(),
transportService.stats(), httpServer == null ? null : httpServer.stats());
}
}

View File

@ -75,5 +75,5 @@ public interface Transport extends LifecycleComponent<Transport> {
*/ */
<T extends Streamable> void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message, TransportRequestOptions options) throws IOException, TransportException; <T extends Streamable> void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message, TransportRequestOptions options) throws IOException, TransportException;
TransportStats stats(); long serverOpen();
} }

View File

@ -69,6 +69,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}); });
private boolean throwConnectException = false; private boolean throwConnectException = false;
private TransportService.Adapter adapter;
public TransportService(Transport transport, ThreadPool threadPool) { public TransportService(Transport transport, ThreadPool threadPool) {
this(EMPTY_SETTINGS, transport, threadPool); this(EMPTY_SETTINGS, transport, threadPool);
@ -82,7 +83,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
@Override protected void doStart() throws ElasticSearchException { @Override protected void doStart() throws ElasticSearchException {
// register us as an adapter for the transport service // register us as an adapter for the transport service
transport.transportServiceAdapter(new Adapter()); adapter = new Adapter();
transport.transportServiceAdapter(adapter);
transport.start(); transport.start();
if (transport.boundAddress() != null && logger.isInfoEnabled()) { if (transport.boundAddress() != null && logger.isInfoEnabled()) {
logger.info("{}", transport.boundAddress()); logger.info("{}", transport.boundAddress());
@ -106,7 +108,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
} }
public TransportStats stats() { public TransportStats stats() {
return transport.stats(); return new TransportStats(transport.serverOpen(), adapter.rxCount.get(), adapter.rxSize.get(), adapter.txCount.get(), adapter.txSize.get());
} }
public BoundTransportAddress boundAddress() { public BoundTransportAddress boundAddress() {
@ -229,6 +231,21 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
class Adapter implements TransportServiceAdapter { class Adapter implements TransportServiceAdapter {
final AtomicLong rxCount = new AtomicLong();
final AtomicLong rxSize = new AtomicLong();
final AtomicLong txCount = new AtomicLong();
final AtomicLong txSize = new AtomicLong();
@Override public void received(long size) {
rxCount.incrementAndGet();
rxSize.addAndGet(size);
}
@Override public void sent(long size) {
txCount.incrementAndGet();
txSize.addAndGet(size);
}
@Override public TransportRequestHandler handler(String action) { @Override public TransportRequestHandler handler(String action) {
return serverHandlers.get(action); return serverHandlers.get(action);
} }

View File

@ -26,6 +26,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
*/ */
public interface TransportServiceAdapter { public interface TransportServiceAdapter {
void received(long size);
void sent(long size);
TransportRequestHandler handler(String action); TransportRequestHandler handler(String action);
TransportResponseHandler remove(long requestId); TransportResponseHandler remove(long requestId);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -30,13 +31,21 @@ import java.io.IOException;
public class TransportStats implements Streamable, ToXContent { public class TransportStats implements Streamable, ToXContent {
private long serverOpen; private long serverOpen;
private long rxCount;
private long rxSize;
private long txCount;
private long txSize;
TransportStats() { TransportStats() {
} }
public TransportStats(long serverOpen) { public TransportStats(long serverOpen, long rxCount, long rxSize, long txCount, long txSize) {
this.serverOpen = serverOpen; this.serverOpen = serverOpen;
this.rxCount = rxCount;
this.rxSize = rxSize;
this.txCount = txCount;
this.txSize = txSize;
} }
public long serverOpen() { public long serverOpen() {
@ -47,6 +56,38 @@ public class TransportStats implements Streamable, ToXContent {
return serverOpen(); return serverOpen();
} }
public long rxCount() {
return rxCount;
}
public long getRxCount() {
return rxCount();
}
public ByteSizeValue rxSize() {
return new ByteSizeValue(rxSize);
}
public ByteSizeValue getRxSize() {
return rxSize();
}
public long txCount() {
return txCount;
}
public long getTxCount() {
return txCount();
}
public ByteSizeValue txSize() {
return new ByteSizeValue(txSize);
}
public ByteSizeValue getTxSize() {
return txSize();
}
public static TransportStats readTransportStats(StreamInput in) throws IOException { public static TransportStats readTransportStats(StreamInput in) throws IOException {
TransportStats stats = new TransportStats(); TransportStats stats = new TransportStats();
stats.readFrom(in); stats.readFrom(in);
@ -55,15 +96,29 @@ public class TransportStats implements Streamable, ToXContent {
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
serverOpen = in.readVLong(); serverOpen = in.readVLong();
rxCount = in.readVLong();
rxSize = in.readVLong();
txCount = in.readVLong();
txSize = in.readVLong();
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(serverOpen); out.writeVLong(serverOpen);
out.writeVLong(rxCount);
out.writeVLong(rxSize);
out.writeVLong(txCount);
out.writeVLong(txSize);
} }
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("transport"); builder.startObject("transport");
builder.field("server_open", serverOpen); builder.field("server_open", serverOpen);
builder.field("rx_count", rxCount);
builder.field("rx_size", rxSize().toString());
builder.field("rx_size_in_bytes", rxSize);
builder.field("tx_count", txCount);
builder.field("tx_size", txSize().toString());
builder.field("tx_size_in_bytes", txSize);
builder.endObject(); builder.endObject();
return builder; return builder;
} }

View File

@ -144,8 +144,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
} }
} }
@Override public TransportStats stats() { @Override public long serverOpen() {
return new TransportStats(0); return 0;
} }
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException { @Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
@ -168,6 +168,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); final byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
transportServiceAdapter.sent(data.length);
threadPool.cached().execute(new Runnable() { threadPool.cached().execute(new Runnable() {
@Override public void run() { @Override public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, requestId); targetTransport.messageReceived(data, action, LocalTransport.this, requestId);
@ -183,6 +185,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
} }
void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final Long sendRequestId) { void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final Long sendRequestId) {
transportServiceAdapter.received(data.length);
StreamInput stream = new BytesStreamInput(data); StreamInput stream = new BytesStreamInput(data);
stream = CachedStreamInput.cachedHandles(stream); stream = CachedStreamInput.cachedHandles(stream);

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.netty.channel.ChannelHandlerContext;
import org.elasticsearch.common.netty.channel.ExceptionEvent; import org.elasticsearch.common.netty.channel.ExceptionEvent;
import org.elasticsearch.common.netty.channel.MessageEvent; import org.elasticsearch.common.netty.channel.MessageEvent;
import org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler; import org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler;
import org.elasticsearch.common.netty.channel.WriteCompletionEvent;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.RemoteTransportException;
@ -62,11 +63,18 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
this.logger = logger; this.logger = logger;
} }
@Override public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
transportServiceAdapter.sent(e.getWrittenAmount());
super.writeComplete(ctx, e);
}
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
int size = buffer.getInt(buffer.readerIndex() - 4); int size = buffer.getInt(buffer.readerIndex() - 4);
transportServiceAdapter.received(size + 4);
int markedReaderIndex = buffer.readerIndex(); int markedReaderIndex = buffer.readerIndex();
int expectedIndexReader = markedReaderIndex + size; int expectedIndexReader = markedReaderIndex + size;

View File

@ -65,7 +65,6 @@ import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportServiceAdapter; import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.TransportStats;
import org.elasticsearch.transport.support.TransportStreams; import org.elasticsearch.transport.support.TransportStreams;
import java.io.IOException; import java.io.IOException;
@ -442,9 +441,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return new InetSocketTransportAddress((InetSocketAddress) socketAddress); return new InetSocketTransportAddress((InetSocketAddress) socketAddress);
} }
@Override public TransportStats stats() { @Override public long serverOpen() {
OpenChannelsHandler channels = serverOpenChannels; OpenChannelsHandler channels = serverOpenChannels;
return new TransportStats(channels == null ? 0 : channels.numberOfOpenChannels()); return channels == null ? 0 : channels.numberOfOpenChannels();
} }
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException { @Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {

View File

@ -20,10 +20,10 @@
package org.elasticsearch.memcached; package org.elasticsearch.memcached;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
/** /**
@ -33,16 +33,16 @@ public class MemcachedServer extends AbstractLifecycleComponent<MemcachedServer>
private final MemcachedServerTransport transport; private final MemcachedServerTransport transport;
private final TransportNodesInfoAction nodesInfoAction; private final NodeService nodeService;
private final RestController restController; private final RestController restController;
@Inject public MemcachedServer(Settings settings, MemcachedServerTransport transport, @Inject public MemcachedServer(Settings settings, MemcachedServerTransport transport,
RestController restController, TransportNodesInfoAction nodesInfoAction) { RestController restController, NodeService nodeService) {
super(settings); super(settings);
this.transport = transport; this.transport = transport;
this.restController = restController; this.restController = restController;
this.nodesInfoAction = nodesInfoAction; this.nodeService = nodeService;
} }
@Override protected void doStart() throws ElasticSearchException { @Override protected void doStart() throws ElasticSearchException {
@ -50,11 +50,11 @@ public class MemcachedServer extends AbstractLifecycleComponent<MemcachedServer>
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("{}", transport.boundAddress()); logger.info("{}", transport.boundAddress());
} }
nodesInfoAction.putNodeAttribute("memcached_address", transport.boundAddress().publishAddress().toString()); nodeService.putNodeAttribute("memcached_address", transport.boundAddress().publishAddress().toString());
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
nodesInfoAction.removeNodeAttribute("memcached_address"); nodeService.removeNodeAttribute("memcached_address");
transport.stop(); transport.stop();
} }

View File

@ -28,13 +28,13 @@ import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportFactory; import org.apache.thrift.transport.TTransportFactory;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.PortsRange; import org.elasticsearch.common.transport.PortsRange;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.transport.BindTransportException; import org.elasticsearch.transport.BindTransportException;
import java.io.IOException; import java.io.IOException;
@ -59,7 +59,7 @@ public class ThriftServer extends AbstractLifecycleComponent<ThriftServer> {
private final NetworkService networkService; private final NetworkService networkService;
private final TransportNodesInfoAction nodesInfoAction; private final NodeService nodeService;
private final ThriftRestImpl client; private final ThriftRestImpl client;
@ -69,11 +69,11 @@ public class ThriftServer extends AbstractLifecycleComponent<ThriftServer> {
private volatile int portNumber; private volatile int portNumber;
@Inject public ThriftServer(Settings settings, NetworkService networkService, TransportNodesInfoAction nodesInfoAction, ThriftRestImpl client) { @Inject public ThriftServer(Settings settings, NetworkService networkService, NodeService nodeService, ThriftRestImpl client) {
super(settings); super(settings);
this.client = client; this.client = client;
this.networkService = networkService; this.networkService = networkService;
this.nodesInfoAction = nodesInfoAction; this.nodeService = nodeService;
this.frame = (int) componentSettings.getAsBytesSize("frame", new ByteSizeValue(-1)).bytes(); this.frame = (int) componentSettings.getAsBytesSize("frame", new ByteSizeValue(-1)).bytes();
this.port = componentSettings.get("port", "9500-9600"); this.port = componentSettings.get("port", "9500-9600");
this.bindHost = componentSettings.get("bind_host", settings.get("transport.bind_host", settings.get("transport.host"))); this.bindHost = componentSettings.get("bind_host", settings.get("transport.bind_host", settings.get("transport.host")));
@ -134,7 +134,7 @@ public class ThriftServer extends AbstractLifecycleComponent<ThriftServer> {
} }
logger.info("bound on port [{}]", portNumber); logger.info("bound on port [{}]", portNumber);
try { try {
nodesInfoAction.putNodeAttribute("thrift_address", new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), portNumber).toString()); nodeService.putNodeAttribute("thrift_address", new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), portNumber).toString());
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} }
@ -147,7 +147,7 @@ public class ThriftServer extends AbstractLifecycleComponent<ThriftServer> {
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
nodesInfoAction.removeNodeAttribute("thrift_address"); nodeService.removeNodeAttribute("thrift_address");
server.stop(); server.stop();
} }