mirror of https://github.com/apache/activemq.git
Deliver the events to the DiscoveryListener async to the mutlicast/heartbeating thread so that that if the
DiscoveryListener blocks for while (like if he is being debuged) then we don't erroniously assume that memebrs int the group have timed out. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@386506 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
24235a99e4
commit
d77e665ed7
|
@ -32,7 +32,13 @@ import org.apache.activemq.transport.discovery.DiscoveryAgent;
|
|||
import org.apache.activemq.transport.discovery.DiscoveryListener;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
|
||||
/**
|
||||
|
@ -68,6 +74,14 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent,Runnable{
|
|||
private long lastAdvertizeTime=0;
|
||||
private AtomicBoolean started=new AtomicBoolean(false);
|
||||
|
||||
private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
|
||||
public Thread newThread(Runnable runable) {
|
||||
Thread t = new Thread(runable, "Multicast Discovery Agent Notifier");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Set the discovery listener
|
||||
*
|
||||
|
@ -301,14 +315,25 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent,Runnable{
|
|||
if(selfService == null || !service.equals(selfService)){
|
||||
AtomicLong lastKeepAlive=(AtomicLong) services.get(service);
|
||||
if(lastKeepAlive==null){
|
||||
lastKeepAlive=new AtomicLong(System.currentTimeMillis());
|
||||
services.put(service,lastKeepAlive);
|
||||
brokers.put(service, brokerName);
|
||||
if(discoveryListener!=null){
|
||||
DiscoveryEvent event=new DiscoveryEvent(service);
|
||||
final DiscoveryEvent event=new DiscoveryEvent(service);
|
||||
event.setBrokerName(brokerName);
|
||||
discoveryListener.onServiceAdd(event);
|
||||
|
||||
// Have the listener process the event async so that
|
||||
// he does not block this thread since we are doing time sensitive
|
||||
// processing of events.
|
||||
executor.execute(new Runnable() {
|
||||
public void run() {
|
||||
DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
|
||||
if(discoveryListener!=null){
|
||||
discoveryListener.onServiceAdd(event);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
lastKeepAlive=new AtomicLong(System.currentTimeMillis());
|
||||
services.put(service,lastKeepAlive);
|
||||
doAdvertizeSelf();
|
||||
|
||||
}
|
||||
|
@ -321,9 +346,20 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent,Runnable{
|
|||
if(services.remove(service)!=null){
|
||||
brokers.remove(service);
|
||||
if(discoveryListener!=null){
|
||||
DiscoveryEvent event=new DiscoveryEvent(service);
|
||||
final DiscoveryEvent event=new DiscoveryEvent(service);
|
||||
event.setBrokerName(brokerName);
|
||||
discoveryListener.onServiceRemove(event);
|
||||
|
||||
// Have the listener process the event async so that
|
||||
// he does not block this thread since we are doing time sensitive
|
||||
// processing of events.
|
||||
executor.execute(new Runnable() {
|
||||
public void run() {
|
||||
DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
|
||||
if(discoveryListener!=null){
|
||||
discoveryListener.onServiceRemove(event);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue