Netty: Make sure channel closing never happens on i/o thread

Similar to NettyTransport.doStop() all actions which disconnect
from a node (and thus call awaitUnterruptibly) should not be executed
on the I/O thread.

This patch ensures that all disconnects happen in the generic threadpool, trying to avoid unnecessary `disconnectFromNode` calls.

Also added a missing return statement in case the component was not yet
started when catching an exception on the netty layer.

Closes #7726
This commit is contained in:
Alexander Reelsen 2014-09-11 13:39:24 +02:00 committed by Luca Cavanna
parent 2250f58757
commit ec86808fa9
1 changed files with 24 additions and 9 deletions

View File

@ -470,6 +470,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
if (!lifecycle.started()) {
// ignore
return;
}
if (isCloseConnectionException(e.getCause())) {
logger.trace("close connection exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), ctx.getChannel());
@ -794,14 +795,20 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
/**
* Disconnects from a node if a channel is found as part of that nodes channels.
*/
private void disconnectFromNodeChannel(Channel channel, Throwable failure) {
for (DiscoveryNode node : connectedNodes.keySet()) {
if (disconnectFromNode(node, channel, ExceptionsHelper.detailedMessage(failure))) {
// if we managed to find this channel and disconnect from it, then break, no need to check on
// the rest of the nodes
break;
private void disconnectFromNodeChannel(final Channel channel, final Throwable failure) {
threadPool().generic().execute(new Runnable() {
@Override
public void run() {
for (DiscoveryNode node : connectedNodes.keySet()) {
if (disconnectFromNode(node, channel, ExceptionsHelper.detailedMessage(failure))) {
// if we managed to find this channel and disconnect from it, then break, no need to check on
// the rest of the nodes
break;
}
}
}
}
});
}
private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException {
@ -885,8 +892,16 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
disconnectFromNode(node, future.getChannel(), "channel closed event");
public void operationComplete(final ChannelFuture future) throws Exception {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(future.getChannel())) {
threadPool().generic().execute(new Runnable() {
@Override
public void run() {
disconnectFromNode(node, future.getChannel(), "channel closed event");
}
});
}
}
}