better handlign of open channels

This commit is contained in:
Shay Banon 2011-11-11 22:18:37 +02:00
parent 93f1d50c18
commit 99d31cc8c8
5 changed files with 48 additions and 12 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.netty;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.netty.channel.Channel;
import org.elasticsearch.common.netty.channel.ChannelEvent;
@ -39,25 +40,40 @@ import java.util.Set;
@ChannelHandler.Sharable
public class OpenChannelsHandler implements ChannelUpstreamHandler {
private Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
private CounterMetric openChannelsMetric = new CounterMetric();
final Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
final CounterMetric openChannelsMetric = new CounterMetric();
final CounterMetric totalChannelsMetric = new CounterMetric();
private final ChannelFutureListener remover = new ChannelFutureListener() {
final ESLogger logger;
public OpenChannelsHandler(ESLogger logger) {
this.logger = logger;
}
final ChannelFutureListener remover = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
boolean removed = openChannels.remove(future.getChannel());
if (removed) {
openChannelsMetric.dec();
}
if (logger.isTraceEnabled()) {
logger.trace("channel closed: {}", future.getChannel());
}
}
};
@Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
if (evt.getState() == ChannelState.OPEN) {
// OPEN is also sent to when closing channel, but with FALSE on it to indicate it closes
if (evt.getState() == ChannelState.OPEN && Boolean.TRUE.equals(evt.getValue())) {
if (logger.isTraceEnabled()) {
logger.trace("channel opened: {}", ctx.getChannel());
}
boolean added = openChannels.add(ctx.getChannel());
if (added) {
openChannelsMetric.inc();
totalChannelsMetric.inc();
ctx.getChannel().getCloseFuture().addListener(remover);
}
}
@ -69,6 +85,10 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler {
return openChannelsMetric.count();
}
public long totalChannels() {
return totalChannelsMetric.count();
}
public void close() {
for (Channel channel : openChannels) {
channel.close().awaitUninterruptibly();

View File

@ -30,13 +30,15 @@ import java.io.IOException;
public class HttpStats implements Streamable, ToXContent {
private long serverOpen;
private long totalOpen;
HttpStats() {
}
public HttpStats(long serverOpen) {
public HttpStats(long serverOpen, long totalOpen) {
this.serverOpen = serverOpen;
this.totalOpen = totalOpen;
}
public long serverOpen() {
@ -47,6 +49,14 @@ public class HttpStats implements Streamable, ToXContent {
return serverOpen();
}
public long totalOpen() {
return this.totalOpen;
}
public long getTotalOpen() {
return this.totalOpen;
}
public static HttpStats readHttpStats(StreamInput in) throws IOException {
HttpStats stats = new HttpStats();
stats.readFrom(in);
@ -55,15 +65,18 @@ public class HttpStats implements Streamable, ToXContent {
@Override public void readFrom(StreamInput in) throws IOException {
serverOpen = in.readVLong();
totalOpen = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(serverOpen);
out.writeVLong(totalOpen);
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("http");
builder.field("server_open", serverOpen);
builder.field("current_open", serverOpen);
builder.field("total_opened", totalOpen);
builder.endObject();
return builder;
}

View File

@ -163,7 +163,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
}
@Override protected void doStart() throws ElasticSearchException {
this.serverOpenChannels = new OpenChannelsHandler();
this.serverOpenChannels = new OpenChannelsHandler(logger);
if (blockingServer) {
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
@ -258,7 +258,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
@Override public HttpStats stats() {
OpenChannelsHandler channels = serverOpenChannels;
return new HttpStats(channels == null ? 0 : channels.numberOfOpenChannels());
return new HttpStats(channels == null ? 0 : channels.numberOfOpenChannels(), channels == null ? 0 : channels.totalChannels());
}
void dispatchRequest(HttpRequest request, HttpChannel channel) {
@ -277,8 +277,11 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
return;
}
if (!NetworkExceptionHelper.isCloseConnectionException(e.getCause())) {
logger.warn("Caught exception while handling client http traffic, closing connection", e.getCause());
ctx.getChannel().disconnect();
logger.warn("Caught exception while handling client http traffic, closing connection {}", e.getCause(), ctx.getChannel());
ctx.getChannel().close();
} else {
logger.debug("Caught exception while handling client http traffic, closing connection {}", e.getCause(), ctx.getChannel());
ctx.getChannel().close();
}
}
}

View File

@ -245,7 +245,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return;
}
serverOpenChannels = new OpenChannelsHandler();
serverOpenChannels = new OpenChannelsHandler(logger);
if (blockingServer) {
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")),

View File

@ -110,7 +110,7 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
}
@Override protected void doStart() throws ElasticSearchException {
this.serverOpenChannels = new OpenChannelsHandler();
this.serverOpenChannels = new OpenChannelsHandler(logger);
if (blockingServer) {
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(