Fix for AMQ-1342 - Added backoff delay in generating discovery events when broker failures are reported

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@559132 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-07-24 17:43:26 +00:00
parent 97a591f08f
commit 7c94a73ba6
1 changed files with 192 additions and 50 deletions

View File

@ -57,11 +57,102 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent,Runnable{
private static final String DELIMITER = "%";
private static final int BUFF_SIZE=8192;
private static final int DEFAULT_IDLE_TIME=500;
private static final int HEARTBEAT_MISS_BEFORE_DEATH=4;
private static final int HEARTBEAT_MISS_BEFORE_DEATH=10;
private long initialReconnectDelay = 1000*5;
private long maxReconnectDelay = 1000 * 30;
private long backOffMultiplier = 2;
private boolean useExponentialBackOff = false;
private int maxReconnectAttempts;
class RemoteBrokerData {
final String brokerName;
final String service;
long lastHeartBeat;
long recoveryTime;
int failureCount;
boolean failed;
public RemoteBrokerData(String brokerName, String service) {
this.brokerName=brokerName;
this.service=service;
this.lastHeartBeat=System.currentTimeMillis();
}
synchronized public void updateHeartBeat() {
lastHeartBeat= System.currentTimeMillis();
// Consider that the broker recovery has succeeded if it has not failed in 60 seconds.
if( !failed && failureCount>0 && (lastHeartBeat-recoveryTime) > 1000*60 ) {
if(log.isDebugEnabled())
log.debug("I now think that the "+service+" service has recovered.");
failureCount=0;
recoveryTime=0;
}
}
synchronized public long getLastHeartBeat() {
return lastHeartBeat;
}
synchronized public boolean markFailed() {
if ( !failed ) {
failed=true;
failureCount++;
long reconnectDelay;
if (!useExponentialBackOff) {
reconnectDelay = initialReconnectDelay;
} else {
reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
if(reconnectDelay>maxReconnectDelay)
reconnectDelay=maxReconnectDelay;
}
if(log.isDebugEnabled())
log.debug("Remote failure of "+service+" while still receiving multicast advertisements. Advertising events will be suppressed for "+reconnectDelay+" ms, the current failure count is: "+failureCount);
recoveryTime = System.currentTimeMillis()+reconnectDelay;
return true;
}
return false;
}
/**
* @return true if this broker is marked failed and it is now the right time to start recovery.
*/
synchronized public boolean doRecovery() {
if( !failed )
return false;
// Are we done trying to recover this guy?
if( maxReconnectAttempts>0 && failureCount > maxReconnectAttempts ) {
if(log.isDebugEnabled())
log.debug("Max reconnect attempts of the "+service+" service has been reached.");
return false;
}
// Is it not yet time?
if( System.currentTimeMillis() < recoveryTime )
return false;
if(log.isDebugEnabled())
log.debug("Resuming event advertisement of the "+service+" service.");
failed=false;
return true;
}
public boolean isFailed() {
return failed;
}
}
private int timeToLive=1;
private boolean loopBackMode=false;
private Map services=new ConcurrentHashMap();
private Map brokers = new ConcurrentHashMap();
private Map brokersByService=new ConcurrentHashMap();
private String group="default";
private String brokerName;
private URI discoveryURI;
@ -325,66 +416,37 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent,Runnable{
private void processAlive(String brokerName,String service){
if(selfService == null || !service.equals(selfService)){
AtomicLong lastKeepAlive=(AtomicLong) services.get(service);
if(lastKeepAlive==null){
brokers.put(service, brokerName);
if(discoveryListener!=null){
final DiscoveryEvent event=new DiscoveryEvent(service);
event.setBrokerName(brokerName);
// Have the listener process the event async so that
// he does not block this thread since we are doing time sensitive
// processing of events.
executor.execute(new Runnable() {
public void run() {
DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
if(discoveryListener!=null){
discoveryListener.onServiceAdd(event);
}
}
});
}
lastKeepAlive=new AtomicLong(System.currentTimeMillis());
services.put(service,lastKeepAlive);
RemoteBrokerData data = (RemoteBrokerData)brokersByService.get(service);
if(data==null){
data = new RemoteBrokerData(brokerName, service);
brokersByService.put(service,data);;
fireServiceAddEvent(data);
doAdvertizeSelf();
} else {
data.updateHeartBeat();
if( data.doRecovery() ) {
fireServiceAddEvent(data);
}
}
lastKeepAlive.set(System.currentTimeMillis());
}
}
private void processDead(String brokerName,String service){
if(!service.equals(selfService)){
if(services.remove(service)!=null){
brokers.remove(service);
if(discoveryListener!=null){
final DiscoveryEvent event=new DiscoveryEvent(service);
event.setBrokerName(brokerName);
// Have the listener process the event async so that
// he does not block this thread since we are doing time sensitive
// processing of events.
executor.execute(new Runnable() {
public void run() {
DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
if(discoveryListener!=null){
discoveryListener.onServiceRemove(event);
}
}
});
}
RemoteBrokerData data = (RemoteBrokerData) brokersByService.remove(service);
if(data!=null && !data.isFailed() ){
fireServiceRemovedEvent(data);
}
}
}
private void doExpireOldServices(){
long expireTime=System.currentTimeMillis()-(keepAliveInterval*HEARTBEAT_MISS_BEFORE_DEATH);
for(Iterator i=services.entrySet().iterator();i.hasNext();){
Map.Entry entry=(Map.Entry) i.next();
AtomicLong lastHeartBeat=(AtomicLong) entry.getValue();
if(lastHeartBeat.get()<expireTime){
String brokerName = (String)brokers.get(entry.getKey());
processDead(brokerName,entry.getKey().toString());
for(Iterator i=brokersByService.values().iterator();i.hasNext();){
RemoteBrokerData data=(RemoteBrokerData)i.next();
if( data.getLastHeartBeat() < expireTime){
processDead(brokerName, data.service);
}
}
}
@ -400,6 +462,86 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent,Runnable{
}
public void serviceFailed(DiscoveryEvent event) throws IOException {
processDead(event.getBrokerName(), event.getServiceName());
RemoteBrokerData data = (RemoteBrokerData)brokersByService.get(event.getServiceName());
if(data!=null && data.markFailed() ) {
fireServiceRemovedEvent(data);
}
}
private void fireServiceRemovedEvent(RemoteBrokerData data) {
if( discoveryListener!=null){
final DiscoveryEvent event=new DiscoveryEvent(data.service);
event.setBrokerName(data.brokerName);
// Have the listener process the event async so that
// he does not block this thread since we are doing time sensitive
// processing of events.
executor.execute(new Runnable() {
public void run() {
DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
if(discoveryListener!=null){
discoveryListener.onServiceRemove(event);
}
}
});
}
}
private void fireServiceAddEvent(RemoteBrokerData data) {
if( discoveryListener!=null){
final DiscoveryEvent event=new DiscoveryEvent(data.service);
event.setBrokerName(data.brokerName);
// Have the listener process the event async so that
// he does not block this thread since we are doing time sensitive
// processing of events.
executor.execute(new Runnable() {
public void run() {
DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
if(discoveryListener!=null){
discoveryListener.onServiceAdd(event);
}
}
});
}
}
public long getBackOffMultiplier() {
return backOffMultiplier;
}
public void setBackOffMultiplier(long backOffMultiplier) {
this.backOffMultiplier = backOffMultiplier;
}
public long getInitialReconnectDelay() {
return initialReconnectDelay;
}
public void setInitialReconnectDelay(long initialReconnectDelay) {
this.initialReconnectDelay = initialReconnectDelay;
}
public int getMaxReconnectAttempts() {
return maxReconnectAttempts;
}
public void setMaxReconnectAttempts(int maxReconnectAttempts) {
this.maxReconnectAttempts = maxReconnectAttempts;
}
public long getMaxReconnectDelay() {
return maxReconnectDelay;
}
public void setMaxReconnectDelay(long maxReconnectDelay) {
this.maxReconnectDelay = maxReconnectDelay;
}
public boolean isUseExponentialBackOff() {
return useExponentialBackOff;
}
public void setUseExponentialBackOff(boolean useExponentialBackOff) {
this.useExponentialBackOff = useExponentialBackOff;
}
}