Ensure new connections won't be opened if transport is closed or closing (#22589)

Today there are several races / holes in TcpTransport and MockTcpTransport
that can allow connections to be opened and remain unclosed while the actual
transport implementation is closed. A recently added assertions in #22554 exposes
these problems. This commit fixes several issues related to missed locks or channel
creations outside of a lock not checking if the resource is still open.
This commit is contained in:
Simon Willnauer 2017-01-12 20:27:09 +01:00 committed by GitHub
parent 2db01b6127
commit acf2d2f86f
2 changed files with 63 additions and 34 deletions

View File

@ -432,15 +432,12 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) {
connectionProfile = connectionProfile == null ? defaultConnectionProfile : connectionProfile;
if (!lifecycle.started()) {
throw new IllegalStateException("can't add nodes to a stopped transport");
}
if (node == null) {
throw new ConnectTransportException(null, "can't connect to a null node");
}
globalLock.readLock().lock();
globalLock.readLock().lock(); // ensure we don't open connections while we are closing
try {
ensureOpen();
try (Releasable ignored = connectionLock.acquire(node.getId())) {
if (!lifecycle.started()) {
throw new IllegalStateException("can't add nodes to a stopped transport");
@ -477,31 +474,40 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
@Override
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
boolean success = false;
NodeChannels nodeChannels = null;
globalLock.readLock().lock(); // ensure we don't open connections while we are closing
try {
nodeChannels = connectToChannels(node, connectionProfile);
final Channel channel = nodeChannels.getChannels().get(0); // one channel is guaranteed by the connection profile
final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ?
defaultConnectionProfile.getConnectTimeout() :
connectionProfile.getConnectTimeout();
final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ?
connectTimeout : connectionProfile.getHandshakeTimeout();
final Version version = executeHandshake(node, channel, handshakeTimeout);
transportServiceAdapter.onConnectionOpened(node);
nodeChannels = new NodeChannels(nodeChannels, version);// clone the channels - we now have the correct version
success = true;
return nodeChannels;
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
// ConnectTransportExceptions are handled specifically on the caller end - we wrap the actual exception to ensure
// only relevant exceptions are logged on the caller end.. this is the same as in connectToNode
throw new ConnectTransportException(node, "general node connection failure", e);
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(nodeChannels);
ensureOpen();
try {
nodeChannels = connectToChannels(node, connectionProfile);
final Channel channel = nodeChannels.getChannels().get(0); // one channel is guaranteed by the connection profile
final TimeValue connectTimeout = connectionProfile.getConnectTimeout() == null ?
defaultConnectionProfile.getConnectTimeout() :
connectionProfile.getConnectTimeout();
final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ?
connectTimeout : connectionProfile.getHandshakeTimeout();
final Version version = executeHandshake(node, channel, handshakeTimeout);
transportServiceAdapter.onConnectionOpened(node);
nodeChannels = new NodeChannels(nodeChannels, version);// clone the channels - we now have the correct version
success = true;
return nodeChannels;
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
// ConnectTransportExceptions are handled specifically on the caller end - we wrap the actual exception to ensure
// only relevant exceptions are logged on the caller end.. this is the same as in connectToNode
throw new ConnectTransportException(node, "general node connection failure", e);
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(nodeChannels);
}
}
} finally {
globalLock.readLock().unlock();
}
}
@ -1577,4 +1583,14 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
}
/**
* Ensures this transport is still started / open
* @throws IllegalStateException if the transport is not started / open
*/
protected final void ensureOpen() {
if (lifecycle.started() == false) {
throw new IllegalStateException("transport has been stopped");
}
}
}

View File

@ -49,10 +49,10 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@ -76,7 +76,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
*/
public static final ConnectionProfile LIGHT_PROFILE;
private final Map<MockChannel, Boolean> openChannels = new IdentityHashMap<>();
private final Set<MockChannel> openChannels = new HashSet<>();
static {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
@ -289,7 +289,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
this.profile = profile;
this.onClose = () -> onClose.accept(this);
synchronized (openChannels) {
openChannels.put(this, Boolean.TRUE);
openChannels.add(this);
}
}
@ -305,6 +305,9 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
this.profile = profile;
this.activeChannel = null;
this.onClose = null;
synchronized (openChannels) {
openChannels.add(this);
}
}
public void accept(Executor executor) throws IOException {
@ -313,10 +316,10 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
MockChannel incomingChannel = null;
try {
configureSocket(incomingSocket);
incomingChannel = new MockChannel(incomingSocket, localAddress, profile, workerChannels::remove);
//establish a happens-before edge between closing and accepting a new connection
synchronized (this) {
if (isOpen.get()) {
incomingChannel = new MockChannel(incomingSocket, localAddress, profile, workerChannels::remove);
//establish a happens-before edge between closing and accepting a new connection
workerChannels.put(incomingChannel, Boolean.TRUE);
// this spawns a new thread immediately, so OK under lock
incomingChannel.loopRead(executor);
@ -360,7 +363,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
@Override
public void close() throws IOException {
if (isOpen.compareAndSet(true, false)) {
final Boolean removedChannel;
final boolean removedChannel;
synchronized (openChannels) {
removedChannel = openChannels.remove(this);
}
@ -370,9 +373,19 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
IOUtils.close(serverSocket, activeChannel, () -> IOUtils.close(workerChannels.keySet()),
() -> cancellableThreads.cancel("channel closed"), onClose);
}
assert removedChannel : "Channel was not removed or removed twice?";
assert removedChannel: "Channel was not removed or removed twice?";
}
}
@Override
public String toString() {
return "MockChannel{" +
"profile='" + profile + '\'' +
", isOpen=" + isOpen +
", localAddress=" + localAddress +
", isServerSocket=" + (serverSocket != null) +
'}';
}
}