Fix connect concurrency, can cause connection nodes to close

Looking at the connect code, if 2 threads at the same time try and connect to a node, and both enter sequentially the connectLock code block, the second one would try and put the connection in the map, and close the replaced channels, which will cause the existing connection to close as well (since it removes the node from the connectedNodes map)
To fix this, simply make sure we properly check the existence of the connection within the connectionLock block, so there won't be concurrent connections going on.
While doing this, also went over all the mutation code that handles disconnections, and made sure they are properly done only within a connection lock.
closes #6964
This commit is contained in:
Shay Banon 2014-07-22 18:31:13 +02:00
parent 72b3d6ef75
commit 88f3afe4b5
1 changed files with 38 additions and 59 deletions

View File

@ -21,10 +21,7 @@ package org.elasticsearch.transport.netty;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.*;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference;
@ -598,48 +595,38 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
} }
globalLock.readLock().lock(); globalLock.readLock().lock();
try { try {
if (!lifecycle.started()) {
throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
}
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
connectionLock.acquire(node.id()); connectionLock.acquire(node.id());
try { try {
if (!lifecycle.started()) { if (!lifecycle.started()) {
throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport"); throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
} }
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
try { try {
if (light) { if (light) {
nodeChannels = connectToChannelsLight(node); nodeChannels = connectToChannelsLight(node);
} else { } else {
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]); nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
try { try {
connectToChannels(nodeChannels, node); connectToChannels(nodeChannels, node);
} catch (Exception e) { } catch (Throwable e) {
logger.trace("failed to connect to [{}], cleaning dangling connections", e, node);
nodeChannels.close(); nodeChannels.close();
throw e; throw e;
} }
} }
// we acquire a connection lock, so no way there is an existing connection
NodeChannels existing = connectedNodes.putIfAbsent(node, nodeChannels); connectedNodes.put(node, nodeChannels);
if (existing != null) { if (logger.isDebugEnabled()) {
// we are already connected to a node, close this ones logger.debug("connected to node [{}]", node);
nodeChannels.close();
} else {
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
transportServiceAdapter.raiseNodeConnected(node);
} }
transportServiceAdapter.raiseNodeConnected(node);
} catch (ConnectTransportException e) { } catch (ConnectTransportException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
throw new ConnectTransportException(node, "General node connection failure", e); throw new ConnectTransportException(node, "general node connection failure", e);
} }
} finally { } finally {
connectionLock.release(node.id()); connectionLock.release(node.id());
@ -759,45 +746,51 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
@Override @Override
public void disconnectFromNode(DiscoveryNode node) { public void disconnectFromNode(DiscoveryNode node) {
NodeChannels nodeChannels = connectedNodes.remove(node); connectionLock.acquire(node.id());
if (nodeChannels != null) { try {
connectionLock.acquire(node.id()); NodeChannels nodeChannels = connectedNodes.remove(node);
try { if (nodeChannels != null) {
try { try {
logger.debug("disconnecting from [{}] due to explicit disconnect call", node);
nodeChannels.close(); nodeChannels.close();
} finally { } finally {
logger.debug("disconnected from [{}]", node); logger.trace("disconnected from [{}] due to explicit disconnect call", node);
transportServiceAdapter.raiseNodeDisconnected(node); transportServiceAdapter.raiseNodeDisconnected(node);
} }
} finally {
connectionLock.release(node.id());
} }
} finally {
connectionLock.release(node.id());
} }
} }
/** /**
* Disconnects from a node, only if the relevant channel is found to be part of the node channels. * Disconnects from a node, only if the relevant channel is found to be part of the node channels.
*/ */
private void disconnectFromNode(DiscoveryNode node, Channel channel, String reason) { private boolean disconnectFromNode(DiscoveryNode node, Channel channel, String reason) {
// this might be called multiple times from all the node channels, so do a lightweight
// check outside of the lock
NodeChannels nodeChannels = connectedNodes.get(node); NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) { if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectionLock.acquire(node.id()); connectionLock.acquire(node.id());
if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check try {
assert !connectedNodes.containsKey(node); nodeChannels = connectedNodes.get(node);
} else { // check again within the connection lock, if its still applicable to remove it
try { if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectedNodes.remove(node); connectedNodes.remove(node);
try { try {
logger.debug("disconnecting from [{}], {}", node, reason);
nodeChannels.close(); nodeChannels.close();
} finally { } finally {
logger.debug("disconnected from [{}], {}", node, reason); logger.trace("disconnected from [{}], {}", node, reason);
transportServiceAdapter.raiseNodeDisconnected(node); transportServiceAdapter.raiseNodeDisconnected(node);
} }
} finally { return true;
connectionLock.release(node.id());
} }
} finally {
connectionLock.release(node.id());
} }
} }
return false;
} }
/** /**
@ -805,24 +798,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
*/ */
private void disconnectFromNodeChannel(Channel channel, Throwable failure) { private void disconnectFromNodeChannel(Channel channel, Throwable failure) {
for (DiscoveryNode node : connectedNodes.keySet()) { for (DiscoveryNode node : connectedNodes.keySet()) {
NodeChannels nodeChannels = connectedNodes.get(node); if (disconnectFromNode(node, channel, ExceptionsHelper.detailedMessage(failure))) {
if (nodeChannels != null && nodeChannels.hasChannel(channel)) { // if we managed to find this channel and disconnect from it, then break, no need to check on
connectionLock.acquire(node.id()); // the rest of the nodes
if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check break;
assert !connectedNodes.containsKey(node);
} else {
try {
connectedNodes.remove(node);
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}] on channel failure", failure, node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
} finally {
connectionLock.release(node.id());
}
}
} }
} }
} }