Simplify connection closing and cleanups in TcpTransport (#25250)

Today we maintain a map of open connections in order to close them when
a low level channel gets closed or handles a failure. We also spawn a thread due to some
tricky concurrency issues especially with respect to netty since they listener might
be called on a transport / boss thread. Executions on those threads must not be blocking
since otherwise we will likely deadlock the event processing which adds to the
complexity of the concurrency model in this class.

This change associates the connection with the close callback that every channel invokes
once it's closed which allows us to remove the connections map. A relaxed non-blocking
concurrency model in the connection close listener allows cleaning up connected nodes without
blocking on any lock.
This commit is contained in:
Simon Willnauer 2017-06-19 09:19:45 +02:00 committed by GitHub
parent 9eca380a76
commit dc02b32650
2 changed files with 79 additions and 128 deletions

View File

@ -87,6 +87,7 @@ import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -113,7 +114,6 @@ import static org.elasticsearch.common.settings.Setting.timeSetting;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent implements Transport {
@ -161,7 +161,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
protected volatile TransportServiceAdapter transportServiceAdapter;
// node id to actual channel
protected final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private final Set<NodeChannels> openConnections = newConcurrentSet();
protected final Map<String, List<Channel>> serverChannels = newConcurrentMap();
protected final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
@ -171,7 +170,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
protected final ReadWriteLock globalLock = new ReentrantReadWriteLock();
protected final ReadWriteLock closeLock = new ReentrantReadWriteLock();
protected final boolean compress;
protected volatile BoundTransportAddress boundAddress;
private final String transportName;
@ -390,15 +389,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
return version;
}
public boolean hasChannel(Channel channel) {
for (Channel channel1 : channels) {
if (channel.equals(channel1)) {
return true;
}
}
return false;
}
public List<Channel> getChannels() {
return Arrays.asList(channels);
}
@ -412,12 +402,12 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
@Override
public synchronized void close() throws IOException {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()));
} finally {
onNodeChannelsClosed(this);
transportServiceAdapter.onConnectionClosed(this);
}
}
}
@ -436,6 +426,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
Channel channel = channel(options.type());
sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte) 0);
}
boolean isClosed() {
return closed.get();
}
}
@Override
@ -451,7 +445,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
if (node == null) {
throw new ConnectTransportException(null, "can't connect to a null node");
}
globalLock.readLock().lock(); // ensure we don't open connections while we are closing
closeLock.readLock().lock(); // ensure we don't open connections while we are closing
try {
ensureOpen();
try (Releasable ignored = connectionLock.acquire(node.getId())) {
@ -468,7 +462,24 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
try {
transportServiceAdapter.onNodeConnected(node);
} finally {
if (nodeChannels.isClosed()) {
// we got closed concurrently due to a disconnect or some other event on the channel.
// the close callback will close the NodeChannel instance first and then try to remove
// the connection from the connected nodes. It will NOT acquire the connectionLock for
// the node to prevent any blocking calls on network threads. Yet, we still establish a happens
// before relationship to the connectedNodes.put since we check if we can remove the
// (DiscoveryNode, NodeChannels) tuple from the map after we closed. Here we check if it's closed an if so we
// try to remove it first either way one of the two wins even if the callback has run before we even added the
// tuple to the map since in that case we remove it here again
if (connectedNodes.remove(node, nodeChannels)) {
transportServiceAdapter.onNodeDisconnected(node);
}
throw new NodeNotConnectedException(node, "connection concurrently closed");
}
}
success = true;
} catch (ConnectTransportException e) {
throw e;
@ -484,7 +495,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
} finally {
globalLock.readLock().unlock();
closeLock.readLock().unlock();
}
}
@ -519,11 +530,12 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
boolean success = false;
NodeChannels nodeChannels = null;
connectionProfile = resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
globalLock.readLock().lock(); // ensure we don't open connections while we are closing
closeLock.readLock().lock(); // ensure we don't open connections while we are closing
try {
ensureOpen();
try {
AtomicBoolean runOnce = new AtomicBoolean(false);
final AtomicBoolean runOnce = new AtomicBoolean(false);
final AtomicReference<NodeChannels> connectionRef = new AtomicReference<>();
Consumer<Channel> onClose = c -> {
assert isOpen(c) == false : "channel is still open when onClose is called";
try {
@ -532,7 +544,10 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
// we only need to disconnect from the nodes once since all other channels
// will also try to run this we protect it from running multiple times.
if (runOnce.compareAndSet(false, true)) {
disconnectFromNodeChannel(c, "channel closed");
NodeChannels connection = connectionRef.get();
if (connection != null) {
disconnectFromNodeCloseAndNotify(node, connection);
}
}
}
};
@ -546,7 +561,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
final Version version = executeHandshake(node, channel, handshakeTimeout);
nodeChannels = new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version
transportServiceAdapter.onConnectionOpened(nodeChannels);
openConnections.add(nodeChannels);
connectionRef.set(nodeChannels);
success = true;
return nodeChannels;
} catch (ConnectTransportException e) {
@ -561,77 +576,38 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
} finally {
globalLock.readLock().unlock();
closeLock.readLock().unlock();
}
}
/**
* Disconnects from a node, only if the relevant channel is found to be part of the node channels.
*/
protected boolean disconnectFromNode(DiscoveryNode node, Channel channel, String reason) {
// this might be called multiple times from all the node channels, so do a lightweight
// check outside of the lock
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
try (Releasable ignored = connectionLock.acquire(node.getId())) {
nodeChannels = connectedNodes.get(node);
// check again within the connection lock, if its still applicable to remove it
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectedNodes.remove(node);
closeAndNotify(node, nodeChannels, reason);
return true;
}
}
}
return false;
}
private void closeAndNotify(DiscoveryNode node, NodeChannels nodeChannels, String reason) {
private void disconnectFromNodeCloseAndNotify(DiscoveryNode node, NodeChannels nodeChannels) {
assert nodeChannels != null : "nodeChannels must not be null";
try {
logger.debug("disconnecting from [{}], {}", node, reason);
IOUtils.closeWhileHandlingException(nodeChannels);
} finally {
logger.trace("disconnected from [{}], {}", node, reason);
if (closeLock.readLock().tryLock()) {
try {
if (connectedNodes.remove(node, nodeChannels)) {
transportServiceAdapter.onNodeDisconnected(node);
}
} finally {
closeLock.readLock().unlock();
}
}
}
}
/**
* Disconnects from a node if a channel is found as part of that nodes channels.
*/
protected final void disconnectFromNodeChannel(final Channel channel, final String reason) {
threadPool.generic().execute(() -> {
try {
protected final void closeChannelWhileHandlingExceptions(final Channel channel) {
if (isOpen(channel)) {
try {
closeChannels(Collections.singletonList(channel));
}
} catch (IOException e) {
logger.warn("failed to close channel", e);
} finally {
outer:
{
for (Map.Entry<DiscoveryNode, NodeChannels> entry : connectedNodes.entrySet()) {
if (disconnectFromNode(entry.getKey(), channel, reason)) {
// if we managed to find this channel and disconnect from it, then break, no need to check on
// the rest of the nodes
// #onNodeChannelsClosed will remove it..
assert openConnections.contains(entry.getValue()) == false : "NodeChannel#close should remove the connetion";
// we can only be connected and published to a single node with one connection. So if disconnectFromNode
// returns true we can safely break out from here since we cleaned up everything needed
break outer;
}
}
// now if we haven't found the right connection in the connected nodes we have to go through the open connections
// it might be that the channel belongs to a connection that is not published
for (NodeChannels channels : openConnections) {
if (channels.hasChannel(channel)) {
IOUtils.closeWhileHandlingException(channels);
break;
}
}
}
}
});
}
@Override
@ -645,10 +621,14 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
@Override
public void disconnectFromNode(DiscoveryNode node) {
closeLock.readLock().lock();
NodeChannels nodeChannels = null;
try (Releasable ignored = connectionLock.acquire(node.getId())) {
NodeChannels nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) {
closeAndNotify(node, nodeChannels, "due to explicit disconnect call");
nodeChannels = connectedNodes.remove(node);
} finally {
closeLock.readLock().unlock();
if (nodeChannels != null) { // if we found it and removed it we close and notify
IOUtils.closeWhileHandlingException(nodeChannels, () -> transportServiceAdapter.onNodeDisconnected(node));
}
}
}
@ -921,7 +901,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
final CountDownLatch latch = new CountDownLatch(1);
// make sure we run it on another thread than a possible IO handler thread
threadPool.generic().execute(() -> {
globalLock.writeLock().lock();
closeLock.writeLock().lock();
try {
// first stop to accept any incoming connections so nobody can connect to this transport
for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) {
@ -935,12 +915,19 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
IOUtils.closeWhileHandlingException(Iterables.concat(connectedNodes.values(), openConnections));
openConnections.clear();
connectedNodes.clear();
Iterator<Map.Entry<DiscoveryNode, NodeChannels>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, NodeChannels> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
transportServiceAdapter.onNodeDisconnected(next.getKey());
} finally {
iterator.remove();
}
}
stopInternal();
} finally {
globalLock.writeLock().unlock();
closeLock.writeLock().unlock();
latch.countDown();
}
});
@ -954,10 +941,9 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
protected void onException(Channel channel, Exception e) {
String reason = ExceptionsHelper.detailedMessage(e);
if (!lifecycle.started()) {
// just close and ignore - we are already stopped and just need to make sure we release all resources
disconnectFromNodeChannel(channel, reason);
closeChannelWhileHandlingExceptions(channel);
return;
}
@ -968,15 +954,15 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
channel),
e);
// close the channel, which will cause a node to be disconnected if relevant
disconnectFromNodeChannel(channel, reason);
closeChannelWhileHandlingExceptions(channel);
} else if (isConnectException(e)) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("connect exception caught on transport layer [{}]", channel), e);
// close the channel as safe measure, which will cause a node to be disconnected if relevant
disconnectFromNodeChannel(channel, reason);
closeChannelWhileHandlingExceptions(channel);
} else if (e instanceof BindException) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("bind exception caught on transport layer [{}]", channel), e);
// close the channel as safe measure, which will cause a node to be disconnected if relevant
disconnectFromNodeChannel(channel, reason);
closeChannelWhileHandlingExceptions(channel);
} else if (e instanceof CancelledKeyException) {
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage(
@ -984,7 +970,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
channel),
e);
// close the channel as safe measure, which will cause a node to be disconnected if relevant
disconnectFromNodeChannel(channel, reason);
closeChannelWhileHandlingExceptions(channel);
} else if (e instanceof TcpTransport.HttpOnTransportException) {
// in case we are able to return data, serialize the exception content and sent it back to the client
if (isOpen(channel)) {
@ -1015,7 +1001,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e);
// close the channel, which will cause a node to be disconnected if relevant
disconnectFromNodeChannel(channel, reason);
closeChannelWhileHandlingExceptions(channel);
}
}
@ -1712,22 +1698,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
}
private void onNodeChannelsClosed(NodeChannels channels) {
// don't assert here since the channel / connection might not have been registered yet
final boolean remove = openConnections.remove(channels);
if (remove) {
transportServiceAdapter.onConnectionClosed(channels);
}
}
final int getNumOpenConnections() {
return openConnections.size();
}
final int getNumConnectedNodes() {
return connectedNodes.size();
}
/**
* Returns count of currently open connections
*/

View File

@ -169,8 +169,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
try {
assertNoPendingHandshakes(serviceA.getOriginalTransport());
assertNoPendingHandshakes(serviceB.getOriginalTransport());
assertPendingConnections(0, serviceA.getOriginalTransport());
assertPendingConnections(0, serviceB.getOriginalTransport());
} finally {
IOUtils.close(serviceA, serviceB, () -> {
try {
@ -194,12 +192,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
}
public void assertPendingConnections(int numConnections, Transport transport) {
if (transport instanceof TcpTransport) {
TcpTransport tcpTransport = (TcpTransport) transport;
assertEquals(numConnections, tcpTransport.getNumOpenConnections() - tcpTransport.getNumConnectedNodes());
}
}
public void testHelloWorld() {
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
@ -2243,13 +2235,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceB.sendRequest(connection, "action", new TestRequest("hello world"), TransportRequestOptions.EMPTY,
transportResponseHandler);
receivedLatch.await();
assertPendingConnections(1, serviceB.getOriginalTransport());
serviceC.close();
assertPendingConnections(0, serviceC.getOriginalTransport());
sendResponseLatch.countDown();
responseLatch.await();
}
assertPendingConnections(0, serviceC.getOriginalTransport());
}
public void testTransportStats() throws Exception {
@ -2340,14 +2329,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
assertEquals(2, stats.getTxCount());
assertEquals(46, stats.getRxSize().getBytes());
assertEquals(91, stats.getTxSize().getBytes());
} finally {
try {
assertPendingConnections(0, serviceC.getOriginalTransport());
} finally {
serviceC.close();
}
}
}
public void testTransportStatsWithException() throws Exception {
MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
@ -2442,12 +2427,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
// on the stack. The emphemeral port will always be in the same range
assertEquals(185 + addressLen, stats.getRxSize().getBytes());
assertEquals(91, stats.getTxSize().getBytes());
} finally {
try {
assertPendingConnections(0, serviceC.getOriginalTransport());
} finally {
serviceC.close();
}
}
}
}