fork node connected and disconnected on a different thread pool

This commit is contained in:
kimchy 2010-06-17 12:47:42 +03:00
parent 662ded3801
commit 597f020156
3 changed files with 38 additions and 20 deletions

View File

@ -0,0 +1,9 @@
#discovery:
# zen:
# send_leave_request: false
# fd:
# register_connection_listener: false
# ping:
# unicast.hosts: ["localhost:9300", "localhost:9301", "localhost:9302", "localhost:9303", "localhost:9304", "localhost:9305", "localhost:9306"]
# multicast.enabled: false
#transport.netty.connections_per_node: 5

View File

@ -2,6 +2,7 @@ rootLogger: INFO, console, file
logger:
# log action execution errors for easier debugging
action : DEBUG
discovery.zen.fd: DEBUG
appender:
console:

View File

@ -260,32 +260,40 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return holder.handler();
}
@Override public void raiseNodeConnected(DiscoveryNode node) {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeConnected(node);
}
@Override public void raiseNodeConnected(final DiscoveryNode node) {
threadPool.execute(new Runnable() {
@Override public void run() {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeConnected(node);
}
}
});
}
@Override public void raiseNodeDisconnected(final DiscoveryNode node) {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeDisconnected(node);
}
// node got disconnected, raise disconnection on possible ongoing handlers
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
RequestHolder holder = entry.getValue();
if (holder.node().equals(node)) {
final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey());
if (holderToNotify != null) {
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
threadPool.execute(new Runnable() {
@Override public void run() {
holderToNotify.handler().handleException(new NodeDisconnectedTransportException(node, holderToNotify.action()));
threadPool.execute(new Runnable() {
@Override public void run() {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeDisconnected(node);
}
// node got disconnected, raise disconnection on possible ongoing handlers
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
RequestHolder holder = entry.getValue();
if (holder.node().equals(node)) {
final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey());
if (holderToNotify != null) {
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
threadPool.execute(new Runnable() {
@Override public void run() {
holderToNotify.handler().handleException(new NodeDisconnectedTransportException(node, holderToNotify.action()));
}
});
}
});
}
}
}
}
});
}
}