improve threading usage on discovery and fd
This commit is contained in:
parent
73e6aa72cd
commit
8fef3df16f
|
@ -208,9 +208,13 @@ public class MasterFaultDetection extends AbstractComponent {
|
|||
}
|
||||
|
||||
private void notifyDisconnectedFromMaster() {
|
||||
for (Listener listener : listeners) {
|
||||
listener.onDisconnectedFromMaster();
|
||||
}
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
for (Listener listener : listeners) {
|
||||
listener.onDisconnectedFromMaster();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
|
||||
|
@ -297,6 +301,10 @@ public class MasterFaultDetection extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override public boolean spawn() {
|
||||
return false; // no need to spawn, we hardly do anything
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -176,10 +176,14 @@ public class NodesFaultDetection extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private void notifyNodeFailure(DiscoveryNode node, String reason) {
|
||||
for (Listener listener : listeners) {
|
||||
listener.onNodeFailure(node, reason);
|
||||
}
|
||||
private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
for (Listener listener : listeners) {
|
||||
listener.onNodeFailure(node, reason);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private class SendPingRequest implements Runnable {
|
||||
|
@ -232,6 +236,10 @@ public class NodesFaultDetection extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override public boolean spawn() {
|
||||
return false; // no need to spawn, we hardly do anything
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -262,6 +262,10 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
|||
}
|
||||
channel.sendResponse(VoidStreamable.INSTANCE);
|
||||
}
|
||||
|
||||
@Override public boolean spawn() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
static class MulticastPingResponse implements Streamable {
|
||||
|
@ -340,7 +344,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
|||
|
||||
if (!transportService.nodeConnected(requestingNode)) {
|
||||
// do the connect and send on a thread pool
|
||||
threadPool.execute(new Runnable() {
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
// connect to the node if possible
|
||||
try {
|
||||
|
|
|
@ -240,6 +240,10 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
|
|||
logger.warn("failed to send ping to [{}]", exp, node);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public boolean spawn() {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
if (wait) {
|
||||
|
@ -282,6 +286,10 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
|
|||
@Override public void messageReceived(UnicastPingRequest request, TransportChannel channel) throws Exception {
|
||||
channel.sendResponse(handlePingRequest(request));
|
||||
}
|
||||
|
||||
@Override public boolean spawn() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
static class UnicastPingRequest implements Streamable {
|
||||
|
|
Loading…
Reference in New Issue