diff --git a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java index a6d215c9b9c..b46eb974970 100644 --- a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java +++ b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/NettyMemcachedServerTransport.java @@ -27,7 +27,6 @@ import org.elasticsearch.transport.BindTransportException; import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.inject.Inject; -import org.elasticsearch.util.netty.OpenChannelsHandler; import org.elasticsearch.util.network.NetworkService; import org.elasticsearch.util.network.NetworkUtils; import org.elasticsearch.util.settings.Settings; diff --git a/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java new file mode 100644 index 00000000000..9c1e04a610f --- /dev/null +++ b/plugins/transport/memcached/src/main/java/org/elasticsearch/memcached/netty/OpenChannelsHandler.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.memcached.netty; + +import org.elasticsearch.util.concurrent.ConcurrentCollections; +import org.jboss.netty.channel.*; + +import java.util.Set; + +/** + * @author kimchy (shay.banon) + */ +@ChannelHandler.Sharable +public class OpenChannelsHandler implements ChannelUpstreamHandler { + + private Set openChannels = ConcurrentCollections.newConcurrentSet(); + + private final ChannelFutureListener remover = new ChannelFutureListener() { + public void operationComplete(ChannelFuture future) throws Exception { + openChannels.remove(future.getChannel()); + } + }; + + @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { + if (e instanceof ChannelStateEvent) { + ChannelStateEvent evt = (ChannelStateEvent) e; + if (evt.getState() == ChannelState.OPEN) { + boolean added = openChannels.add(ctx.getChannel()); + if (added) { + ctx.getChannel().getCloseFuture().addListener(remover); + } + } + } + ctx.sendUpstream(e); + } + + public void close() { + for (Channel channel : openChannels) { + channel.close().awaitUninterruptibly(); + } + } +} \ No newline at end of file