diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 80e3c3a9b95..6ec63ece023 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -21,10 +21,7 @@ package org.elasticsearch.transport.netty; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchIllegalArgumentException; -import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.Version; +import org.elasticsearch.*; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.ReleasableBytesReference; @@ -598,48 +595,38 @@ public class NettyTransport extends AbstractLifecycleComponent implem } globalLock.readLock().lock(); try { - if (!lifecycle.started()) { - throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport"); - } - NodeChannels nodeChannels = connectedNodes.get(node); - if (nodeChannels != null) { - return; - } connectionLock.acquire(node.id()); try { if (!lifecycle.started()) { throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport"); } + NodeChannels nodeChannels = connectedNodes.get(node); + if (nodeChannels != null) { + return; + } try { - - if (light) { nodeChannels = connectToChannelsLight(node); } else { nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]); try { connectToChannels(nodeChannels, node); - } catch (Exception e) { + } catch (Throwable e) { + logger.trace("failed to connect to [{}], cleaning dangling connections", e, node); nodeChannels.close(); throw e; } } - - NodeChannels existing = connectedNodes.putIfAbsent(node, nodeChannels); - if (existing != null) { - // we are already connected to a node, close this ones - nodeChannels.close(); - } else { - if (logger.isDebugEnabled()) { - logger.debug("connected to node [{}]", node); - } - transportServiceAdapter.raiseNodeConnected(node); + // we acquire a connection lock, so no way there is an existing connection + connectedNodes.put(node, nodeChannels); + if (logger.isDebugEnabled()) { + logger.debug("connected to node [{}]", node); } - + transportServiceAdapter.raiseNodeConnected(node); } catch (ConnectTransportException e) { throw e; } catch (Exception e) { - throw new ConnectTransportException(node, "General node connection failure", e); + throw new ConnectTransportException(node, "general node connection failure", e); } } finally { connectionLock.release(node.id()); @@ -759,45 +746,51 @@ public class NettyTransport extends AbstractLifecycleComponent implem @Override public void disconnectFromNode(DiscoveryNode node) { - NodeChannels nodeChannels = connectedNodes.remove(node); - if (nodeChannels != null) { - connectionLock.acquire(node.id()); - try { + connectionLock.acquire(node.id()); + try { + NodeChannels nodeChannels = connectedNodes.remove(node); + if (nodeChannels != null) { try { + logger.debug("disconnecting from [{}] due to explicit disconnect call", node); nodeChannels.close(); } finally { - logger.debug("disconnected from [{}]", node); + logger.trace("disconnected from [{}] due to explicit disconnect call", node); transportServiceAdapter.raiseNodeDisconnected(node); } - } finally { - connectionLock.release(node.id()); } + } finally { + connectionLock.release(node.id()); } } /** * Disconnects from a node, only if the relevant channel is found to be part of the node channels. */ - private void disconnectFromNode(DiscoveryNode node, Channel channel, String reason) { + private boolean disconnectFromNode(DiscoveryNode node, Channel channel, String reason) { + // this might be called multiple times from all the node channels, so do a lightweight + // check outside of the lock NodeChannels nodeChannels = connectedNodes.get(node); if (nodeChannels != null && nodeChannels.hasChannel(channel)) { connectionLock.acquire(node.id()); - if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check - assert !connectedNodes.containsKey(node); - } else { - try { + try { + nodeChannels = connectedNodes.get(node); + // check again within the connection lock, if its still applicable to remove it + if (nodeChannels != null && nodeChannels.hasChannel(channel)) { connectedNodes.remove(node); try { + logger.debug("disconnecting from [{}], {}", node, reason); nodeChannels.close(); } finally { - logger.debug("disconnected from [{}], {}", node, reason); + logger.trace("disconnected from [{}], {}", node, reason); transportServiceAdapter.raiseNodeDisconnected(node); } - } finally { - connectionLock.release(node.id()); + return true; } + } finally { + connectionLock.release(node.id()); } } + return false; } /** @@ -805,24 +798,10 @@ public class NettyTransport extends AbstractLifecycleComponent implem */ private void disconnectFromNodeChannel(Channel channel, Throwable failure) { for (DiscoveryNode node : connectedNodes.keySet()) { - NodeChannels nodeChannels = connectedNodes.get(node); - if (nodeChannels != null && nodeChannels.hasChannel(channel)) { - connectionLock.acquire(node.id()); - if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check - assert !connectedNodes.containsKey(node); - } else { - try { - connectedNodes.remove(node); - try { - nodeChannels.close(); - } finally { - logger.debug("disconnected from [{}] on channel failure", failure, node); - transportServiceAdapter.raiseNodeDisconnected(node); - } - } finally { - connectionLock.release(node.id()); - } - } + if (disconnectFromNode(node, channel, ExceptionsHelper.detailedMessage(failure))) { + // if we managed to find this channel and disconnect from it, then break, no need to check on + // the rest of the nodes + break; } } }