diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/netty/OpenChannelsHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/netty/OpenChannelsHandler.java index a38232597ed..65385e64172 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/netty/OpenChannelsHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/netty/OpenChannelsHandler.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.netty; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.netty.channel.Channel; import org.elasticsearch.common.netty.channel.ChannelEvent; @@ -39,25 +40,40 @@ import java.util.Set; @ChannelHandler.Sharable public class OpenChannelsHandler implements ChannelUpstreamHandler { - private Set openChannels = ConcurrentCollections.newConcurrentSet(); - private CounterMetric openChannelsMetric = new CounterMetric(); + final Set openChannels = ConcurrentCollections.newConcurrentSet(); + final CounterMetric openChannelsMetric = new CounterMetric(); + final CounterMetric totalChannelsMetric = new CounterMetric(); - private final ChannelFutureListener remover = new ChannelFutureListener() { + final ESLogger logger; + + public OpenChannelsHandler(ESLogger logger) { + this.logger = logger; + } + + final ChannelFutureListener remover = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { boolean removed = openChannels.remove(future.getChannel()); if (removed) { openChannelsMetric.dec(); } + if (logger.isTraceEnabled()) { + logger.trace("channel closed: {}", future.getChannel()); + } } }; @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { ChannelStateEvent evt = (ChannelStateEvent) e; - if (evt.getState() == ChannelState.OPEN) { + // OPEN is also sent to when closing channel, but with FALSE on it to indicate it closes + if (evt.getState() == ChannelState.OPEN && Boolean.TRUE.equals(evt.getValue())) { + if (logger.isTraceEnabled()) { + logger.trace("channel opened: {}", ctx.getChannel()); + } boolean added = openChannels.add(ctx.getChannel()); if (added) { openChannelsMetric.inc(); + totalChannelsMetric.inc(); ctx.getChannel().getCloseFuture().addListener(remover); } } @@ -69,6 +85,10 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler { return openChannelsMetric.count(); } + public long totalChannels() { + return totalChannelsMetric.count(); + } + public void close() { for (Channel channel : openChannels) { channel.close().awaitUninterruptibly(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpStats.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpStats.java index a22dc5b6325..cf41e76ea79 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpStats.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/HttpStats.java @@ -30,13 +30,15 @@ import java.io.IOException; public class HttpStats implements Streamable, ToXContent { private long serverOpen; + private long totalOpen; HttpStats() { } - public HttpStats(long serverOpen) { + public HttpStats(long serverOpen, long totalOpen) { this.serverOpen = serverOpen; + this.totalOpen = totalOpen; } public long serverOpen() { @@ -47,6 +49,14 @@ public class HttpStats implements Streamable, ToXContent { return serverOpen(); } + public long totalOpen() { + return this.totalOpen; + } + + public long getTotalOpen() { + return this.totalOpen; + } + public static HttpStats readHttpStats(StreamInput in) throws IOException { HttpStats stats = new HttpStats(); stats.readFrom(in); @@ -55,15 +65,18 @@ public class HttpStats implements Streamable, ToXContent { @Override public void readFrom(StreamInput in) throws IOException { serverOpen = in.readVLong(); + totalOpen = in.readVLong(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(serverOpen); + out.writeVLong(totalOpen); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("http"); - builder.field("server_open", serverOpen); + builder.field("current_open", serverOpen); + builder.field("total_opened", totalOpen); builder.endObject(); return builder; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index 5888596052c..ce627d5472e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -163,7 +163,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent implem return; } - serverOpenChannels = new OpenChannelsHandler(); + serverOpenChannels = new OpenChannelsHandler(logger); if (blockingServer) { serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory( Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")), diff --git a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java index 8c58d510121..54573670734 100644 --- a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java +++ b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java @@ -110,7 +110,7 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent