better shutdown of sockets

This commit is contained in:
kimchy 2011-01-24 13:15:57 +02:00
parent 3537de4530
commit 9d75849362

View File

@ -57,10 +57,7 @@ import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Iterator; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -324,7 +321,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext();) { for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext();) {
NodeChannels nodeChannels = it.next(); NodeChannels nodeChannels = it.next();
it.remove(); it.remove();
nodeChannels.closeAndWait(); nodeChannels.close();
} }
if (clientBootstrap != null) { if (clientBootstrap != null) {
@ -598,35 +595,21 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
} }
} }
public void close() { public synchronized void close() {
closeChannels(low); List<ChannelFuture> futures = new ArrayList<ChannelFuture>();
closeChannels(med); closeChannelsAndWait(low, futures);
closeChannels(high); closeChannelsAndWait(med, futures);
} closeChannelsAndWait(high, futures);
for (ChannelFuture future : futures) {
private void closeChannels(Channel[] channels) { future.awaitUninterruptibly();
for (Channel channel : channels) {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
} catch (Exception e) {
//ignore
}
} }
} }
public void closeAndWait() { private void closeChannelsAndWait(Channel[] channels, List<ChannelFuture> futures) {
closeChannelsAndWait(low);
closeChannelsAndWait(med);
closeChannelsAndWait(high);
}
private void closeChannelsAndWait(Channel[] channels) {
for (Channel channel : channels) { for (Channel channel : channels) {
try { try {
if (channel != null && channel.isOpen()) { if (channel != null && channel.isOpen()) {
channel.close().awaitUninterruptibly(); futures.add(channel.close());
} }
} catch (Exception e) { } catch (Exception e) {
//ignore //ignore