simplify netty transport to use single channel

This commit is contained in:
kimchy 2010-06-17 12:50:01 +03:00
parent eded80805b
commit 3bc8c307f1

View File

@ -57,15 +57,12 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.collect.Lists.*;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.*;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.*;
@ -99,8 +96,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final TimeValue connectTimeout;
final int connectionsPerNode;
final Boolean tcpNoDelay;
final Boolean tcpKeepAlive;
@ -120,7 +115,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private volatile ServerBootstrap serverBootstrap;
// node id to actual channel
final ConcurrentMap<String, NodeConnections> connectedNodes = newConcurrentMap();
final ConcurrentMap<DiscoveryNode, Channel> connectedNodes = newConcurrentMap();
private volatile Channel serverChannel;
@ -145,7 +140,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
this.port = componentSettings.get("port", settings.get("transport.tcp.port", "9300-9400"));
this.bindHost = componentSettings.get("bind_host");
this.connectionsPerNode = componentSettings.getAsInt("connections_per_node", settings.getAsInt("transport.tcp.connection_per_node", 1));
this.publishHost = componentSettings.get("publish_host");
this.connectTimeout = componentSettings.getAsTime("connect_timeout", settings.getAsTime("transport.tcp.connect_timeout", timeValueSeconds(1)));
this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", settings.getAsBoolean(TCP_NO_DELAY, true));
@ -295,30 +289,14 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
serverBootstrap = null;
}
for (Iterator<NodeConnections> it = connectedNodes.values().iterator(); it.hasNext();) {
NodeConnections nodeConnections = it.next();
for (Iterator<Channel> it = connectedNodes.values().iterator(); it.hasNext();) {
Channel channel = it.next();
it.remove();
nodeConnections.close();
closeChannel(channel);
}
if (clientBootstrap != null) {
// HACK, make sure we try and close open client channels also after
// we releaseExternalResources, they seem to hang when there are open client channels
ScheduledFuture<?> scheduledFuture = threadPool.schedule(new Runnable() {
@Override public void run() {
try {
for (Iterator<NodeConnections> it = connectedNodes.values().iterator(); it.hasNext();) {
NodeConnections nodeConnections = it.next();
it.remove();
nodeConnections.close();
}
} catch (Exception e) {
// ignore
}
}
}, 500, TimeUnit.MILLISECONDS);
clientBootstrap.releaseExternalResources();
scheduledFuture.cancel(false);
clientBootstrap = null;
}
}
@ -362,7 +340,15 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (!lifecycle.started()) {
// ignore
}
if (isCloseConnectionException(e.getCause()) || isConnectException(e.getCause())) {
if (isCloseConnectionException(e.getCause())) {
// disconnect the node
Channel channel = ctx.getChannel();
for (Map.Entry<DiscoveryNode, Channel> entry : connectedNodes.entrySet()) {
if (entry.getValue().equals(channel)) {
disconnectFromNode(entry.getKey());
}
}
} else if (isConnectException(e.getCause())) {
if (logger.isTraceEnabled()) {
logger.trace("(Ignoring) Exception caught on netty layer [" + ctx.getChannel() + "]", e.getCause());
}
@ -401,21 +387,22 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
buffer.setInt(0, size); // update real size.
ChannelFuture channelFuture = targetChannel.write(buffer);
channelFuture.addListener(new ChannelFutureListener() {
@Override public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// maybe add back the retry?
TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
if (handler != null) {
handler.handleException(new RemoteTransportException("Failed write request", new SendRequestTransportException(node, action, future.getCause())));
}
}
}
});
// We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future
// channelFuture.addListener(new ChannelFutureListener() {
// @Override public void operationComplete(ChannelFuture future) throws Exception {
// if (!future.isSuccess()) {
// // maybe add back the retry?
// TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
// if (handler != null) {
// handler.handleException(new RemoteTransportException("Failed write request", new SendRequestTransportException(node, action, future.getCause())));
// }
// }
// }
// });
}
@Override public boolean nodeConnected(DiscoveryNode node) {
return connectedNodes.containsKey(node.id());
return connectedNodes.containsKey(node);
}
@Override public void connectToNode(DiscoveryNode node) {
@ -426,146 +413,73 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (node == null) {
throw new ConnectTransportException(node, "Can't connect to a null node");
}
NodeConnections nodeConnections = connectedNodes.get(node.id());
if (nodeConnections != null) {
Channel channel = connectedNodes.get(node);
if (channel != null) {
return;
}
synchronized (this) {
// recheck here, within the sync block (we cache connections, so we don't care about this single sync block)
nodeConnections = connectedNodes.get(node.id());
if (nodeConnections != null) {
channel = connectedNodes.get(node);
if (channel != null) {
return;
}
List<ChannelFuture> connectFutures = newArrayList();
for (int connectionIndex = 0; connectionIndex < connectionsPerNode; connectionIndex++) {
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
connectFutures.add(clientBootstrap.connect(address));
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
ChannelFuture connectFuture = clientBootstrap.connect(address);
connectFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25));
if (!connectFuture.isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectFuture.getCause());
}
List<Channel> channels = newArrayList();
Throwable lastConnectException = null;
for (ChannelFuture connectFuture : connectFutures) {
if (!lifecycle.started()) {
for (Channel channel : channels) {
channel.close().awaitUninterruptibly();
}
throw new ConnectTransportException(node, "Can't connect when the transport is stopped");
}
connectFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25));
if (!connectFuture.isSuccess()) {
lastConnectException = connectFuture.getCause();
} else {
Channel channel = connectFuture.getChannel();
channel.getCloseFuture().addListener(new ChannelCloseListener(node.id()));
channels.add(channel);
}
}
if (channels.isEmpty()) {
if (lastConnectException != null) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", lastConnectException);
}
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "], reason unknown");
}
channel = connectFuture.getChannel();
channel.getCloseFuture().addListener(new ChannelCloseListener(node));
connectedNodes.put(node, channel);
if (logger.isDebugEnabled()) {
logger.debug("Connected to node[{}], number_of_connections[{}]", node, channels.size());
logger.debug("Connected to node [{}]", node);
}
connectedNodes.put(node.id(), new NodeConnections(node, channels.toArray(new Channel[channels.size()])));
transportServiceAdapter.raiseNodeConnected(node);
}
transportServiceAdapter.raiseNodeConnected(node);
} catch (Exception e) {
throw new ConnectTransportException(node, "General node connection failure", e);
}
}
@Override public void disconnectFromNode(DiscoveryNode node) {
NodeConnections nodeConnections = connectedNodes.remove(node.id());
if (nodeConnections != null) {
nodeConnections.close();
Channel channel = connectedNodes.remove(node);
if (channel != null) {
try {
closeChannel(channel);
} finally {
logger.debug("Disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
}
}
private Channel nodeChannel(DiscoveryNode node) throws ConnectTransportException {
NettyTransport.NodeConnections nodeConnections = connectedNodes.get(node.id());
if (nodeConnections == null) {
throw new NodeNotConnectedException(node, "Node not connected");
}
Channel channel = nodeConnections.channel();
Channel channel = connectedNodes.get(node);
if (channel == null) {
throw new NodeNotConnectedException(node, "Node not connected");
}
return channel;
}
public class NodeConnections {
private final DiscoveryNode node;
private final AtomicInteger counter = new AtomicInteger();
private volatile Channel[] channels;
private volatile boolean closed = false;
private NodeConnections(DiscoveryNode node, Channel[] channels) {
this.node = node;
this.channels = channels;
}
private Channel channel() {
return channels[Math.abs(counter.incrementAndGet()) % channels.length];
}
private void channelClosed(Channel closedChannel) {
List<Channel> updated = newArrayList();
for (Channel channel : channels) {
if (!channel.getId().equals(closedChannel.getId())) {
updated.add(channel);
}
}
this.channels = updated.toArray(new Channel[updated.size()]);
}
private int numberOfChannels() {
return channels.length;
}
private synchronized void close() {
if (closed) {
return;
}
closed = true;
Channel[] channelsToClose = channels;
channels = new Channel[0];
for (Channel channel : channelsToClose) {
if (channel.isOpen()) {
channel.close().awaitUninterruptibly();
}
}
logger.debug("Disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node);
private void closeChannel(Channel channel) {
if (channel.isOpen()) {
channel.close().awaitUninterruptibly();
}
}
private class ChannelCloseListener implements ChannelFutureListener {
private final String nodeId;
private final DiscoveryNode node;
private ChannelCloseListener(String nodeId) {
this.nodeId = nodeId;
private ChannelCloseListener(DiscoveryNode node) {
this.node = node;
}
@Override public void operationComplete(ChannelFuture future) throws Exception {
final NodeConnections nodeConnections = connectedNodes.get(nodeId);
if (nodeConnections != null) {
nodeConnections.channelClosed(future.getChannel());
if (nodeConnections.numberOfChannels() == 0) {
// all the channels in the node connections are closed, remove it from
// our client channels
connectedNodes.remove(nodeId);
// and close it
nodeConnections.close();
}
}
disconnectFromNode(node);
}
}
}