From 7e5bfb5d3b61e5991d8e44fdc2bc28d2e6362261 Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 27 May 2010 18:25:59 +0300 Subject: [PATCH] share the open channels handler --- .../http/netty/NettyHttpServerTransport.java | 1 + .../transport/netty/NettyTransport.java | 1 + .../transport/netty/OpenChannelsHandler.java | 60 ------------------- .../netty/OpenChannelsHandler.java | 2 +- .../netty/NettyMemcachedServerTransport.java | 1 + .../memcached/netty/OpenChannelsHandler.java | 59 ------------------ 6 files changed, 4 insertions(+), 120 deletions(-) delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/OpenChannelsHandler.java rename modules/elasticsearch/src/main/java/org/elasticsearch/{http => util}/netty/OpenChannelsHandler.java (98%) delete mode 100644 plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index 32254f443fc..3b318ddba50 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -27,6 +27,7 @@ import org.elasticsearch.util.SizeUnit; import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.inject.Inject; +import org.elasticsearch.util.netty.OpenChannelsHandler; import org.elasticsearch.util.network.NetworkService; import org.elasticsearch.util.network.NetworkUtils; import org.elasticsearch.util.settings.Settings; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 7d63e801328..bd5692bfa00 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -33,6 +33,7 @@ import org.elasticsearch.util.inject.Inject; import org.elasticsearch.util.io.stream.BytesStreamOutput; import org.elasticsearch.util.io.stream.HandlesStreamOutput; import org.elasticsearch.util.io.stream.Streamable; +import org.elasticsearch.util.netty.OpenChannelsHandler; import org.elasticsearch.util.network.NetworkService; import org.elasticsearch.util.network.NetworkUtils; import org.elasticsearch.util.settings.Settings; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/OpenChannelsHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/OpenChannelsHandler.java deleted file mode 100644 index 5bd59ccae96..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/OpenChannelsHandler.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport.netty; - -import org.elasticsearch.util.concurrent.ConcurrentCollections; -import org.jboss.netty.channel.*; - -import java.util.Set; - -/** - * @author kimchy (Shay Banon) - */ -@ChannelHandler.Sharable -public class OpenChannelsHandler implements ChannelUpstreamHandler { - - private Set openChannels = ConcurrentCollections.newConcurrentSet(); - - private final ChannelFutureListener remover = new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) throws Exception { - openChannels.remove(future.getChannel()); - } - }; - - @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { - if (e instanceof ChannelStateEvent) { - ChannelStateEvent evt = (ChannelStateEvent) e; - if (evt.getState() == ChannelState.OPEN) { - boolean added = openChannels.add(ctx.getChannel()); - if (added) { - ctx.getChannel().getCloseFuture().addListener(remover); - } - } - } - ctx.sendUpstream(e); - } - - public void close() { - for (Channel channel : openChannels) { - channel.close().awaitUninterruptibly(); - } - openChannels.clear(); - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/OpenChannelsHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/netty/OpenChannelsHandler.java similarity index 98% rename from modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/OpenChannelsHandler.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/util/netty/OpenChannelsHandler.java index ac25d9618b8..99668229b21 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/OpenChannelsHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/netty/OpenChannelsHandler.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.http.netty; +package org.elasticsearch.util.netty; import org.elasticsearch.util.concurrent.ConcurrentCollections; import org.jboss.netty.channel.*; diff --git a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java index b46eb974970..a6d215c9b9c 100644 --- a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java +++ b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java @@ -27,6 +27,7 @@ import org.elasticsearch.transport.BindTransportException; import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.inject.Inject; +import org.elasticsearch.util.netty.OpenChannelsHandler; import org.elasticsearch.util.network.NetworkService; import org.elasticsearch.util.network.NetworkUtils; import org.elasticsearch.util.settings.Settings; diff --git a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java deleted file mode 100644 index 9c1e04a610f..00000000000 --- a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.memcached.netty; - -import org.elasticsearch.util.concurrent.ConcurrentCollections; -import org.jboss.netty.channel.*; - -import java.util.Set; - -/** - * @author kimchy (shay.banon) - */ -@ChannelHandler.Sharable -public class OpenChannelsHandler implements ChannelUpstreamHandler { - - private Set openChannels = ConcurrentCollections.newConcurrentSet(); - - private final ChannelFutureListener remover = new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) throws Exception { - openChannels.remove(future.getChannel()); - } - }; - - @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { - if (e instanceof ChannelStateEvent) { - ChannelStateEvent evt = (ChannelStateEvent) e; - if (evt.getState() == ChannelState.OPEN) { - boolean added = openChannels.add(ctx.getChannel()); - if (added) { - ctx.getChannel().getCloseFuture().addListener(remover); - } - } - } - ctx.sendUpstream(e); - } - - public void close() { - for (Channel channel : openChannels) { - channel.close().awaitUninterruptibly(); - } - } -} \ No newline at end of file