Network: A closed channel might not always fire up a close event

fixes #2733
This commit is contained in:
Shay Banon 2013-03-05 11:49:10 -08:00
parent acff102234
commit 9a25867bfe
1 changed files with 41 additions and 1 deletions

View File

@ -490,6 +490,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
// close the channel, which will cause a node to be disconnected if relevant
ctx.getChannel().close();
disconnectFromNodeChannel(ctx.getChannel(), e.getCause());
} else if (isConnectException(e.getCause()) || e.getCause() instanceof CancelledKeyException) {
if (logger.isTraceEnabled()) {
logger.trace("(ignoring) exception caught on transport layer [{}]", e.getCause(), ctx.getChannel());
@ -498,6 +499,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
logger.warn("exception caught on transport layer [{}], closing connection", e.getCause(), ctx.getChannel());
// close the channel, which will cause a node to be disconnected if relevant
ctx.getChannel().close();
disconnectFromNodeChannel(ctx.getChannel(), e.getCause());
}
}
@ -738,6 +740,44 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
}
/**
* Disconnects from a node, only if the relevant channel is found to be part of the node channels.
*/
private void disconnectFromNode(DiscoveryNode node, Channel channel, String reason) {
synchronized (connectLock(node.id())) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectedNodes.remove(node);
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}], {}", node, reason);
transportServiceAdapter.raiseNodeDisconnected(node);
}
}
}
}
/**
* Disconnects from a node if a channel is found as part of that nodes channels.
*/
private void disconnectFromNodeChannel(Channel channel, Throwable failure) {
for (DiscoveryNode node : connectedNodes.keySet()) {
synchronized (connectLock(node.id())) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null && nodeChannels.hasChannel(channel)) {
connectedNodes.remove(node);
try {
nodeChannels.close();
} finally {
logger.debug("disconnected from [{}] on channel failure", failure, node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
}
}
}
}
private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels == null) {
@ -765,7 +805,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
@Override
public void operationComplete(ChannelFuture future) throws Exception {
disconnectFromNode(node);
disconnectFromNode(node, future.getChannel(), "channel closed event");
}
}