Catch RejectedExecutionException on node shutdown

This commit is contained in:
Simon Willnauer 2013-08-08 13:10:13 +02:00
parent ef365098e7
commit 04b23a8fab
1 changed files with 22 additions and 17 deletions

View File

@ -38,6 +38,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
@ -292,25 +293,29 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeDisconnected(node);
}
// node got disconnected, raise disconnection on possible ongoing handlers
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
RequestHolder holder = entry.getValue();
if (holder.node().equals(node)) {
final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey());
if (holderToNotify != null) {
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
holderToNotify.handler().handleException(new NodeDisconnectedException(node, holderToNotify.action()));
}
});
try {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeDisconnected(node);
}
// node got disconnected, raise disconnection on possible ongoing handlers
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
RequestHolder holder = entry.getValue();
if (holder.node().equals(node)) {
final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey());
if (holderToNotify != null) {
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
holderToNotify.handler().handleException(new NodeDisconnectedException(node, holderToNotify.action()));
}
});
}
}
}
} catch (RejectedExecutionException ex) {
logger.debug("Rejected execution on NodeDisconnected", ex);
}
}
});