diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java index 9080b86d82..804fc76668 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java @@ -23,6 +23,8 @@ import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.transport.discovery.DiscoveryAgent; import org.apache.activemq.transport.discovery.DiscoveryListener; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; + /** * A simple DiscoveryAgent that allows static configuration of the discovered services. * @@ -30,9 +32,31 @@ import org.apache.activemq.transport.discovery.DiscoveryListener; */ public class SimpleDiscoveryAgent implements DiscoveryAgent { + private long initialReconnectDelay = 1000*5; + private long maxReconnectDelay = 1000 * 30; + private long backOffMultiplier = 2; + private boolean useExponentialBackOff = false; + private int maxReconnectAttempts; + private final Object sleepMutex = new Object(); + private long minConnectTime = 500; + private DiscoveryListener listener; String services[] = new String[] {}; String group = "DEFAULT"; + private final AtomicBoolean running = new AtomicBoolean(false); + + class SimpleDiscoveryEvent extends DiscoveryEvent { + + private int connectFailures; + private long reconnectDelay = initialReconnectDelay; + private long connectTime = System.currentTimeMillis(); + private AtomicBoolean failed = new AtomicBoolean(false); + + public SimpleDiscoveryEvent(String service) { + super(service); + } + + } public void setDiscoveryListener(DiscoveryListener listener) { this.listener = listener; @@ -42,12 +66,17 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent { } public void start() throws Exception { + running.set(true); for (int i = 0; i < services.length; i++) { - listener.onServiceAdd(new DiscoveryEvent(services[i])); + listener.onServiceAdd(new SimpleDiscoveryEvent(services[i])); } } public void stop() throws Exception { + running.set(false); + synchronized(sleepMutex) { + sleepMutex.notifyAll(); + } } public String[] getServices() { @@ -80,7 +109,112 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent { public void setBrokerName(String brokerName) { } - public void serviceFailed(DiscoveryEvent event) throws IOException { + public void serviceFailed(DiscoveryEvent devent) throws IOException { + + final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent) devent; + if( event.failed.compareAndSet(false, true) ) { + + listener.onServiceRemove(event); + Thread thread = new Thread() { + public void run() { + + + // We detect a failed connection attempt because the service fails right + // away. + if( event.connectTime + minConnectTime > System.currentTimeMillis() ) { + + event.connectFailures++; + + if( maxReconnectAttempts>0 && event.connectFailures >= maxReconnectAttempts ) { + // Don' try to re-connect + return; + } + + synchronized(sleepMutex){ + try{ + if( !running.get() ) + return; + + sleepMutex.wait(event.reconnectDelay); + }catch(InterruptedException ie){ + return; + } + } + + if (!useExponentialBackOff) { + event.reconnectDelay = initialReconnectDelay; + } else { + // Exponential increment of reconnect delay. + event.reconnectDelay*=backOffMultiplier; + if(event.reconnectDelay>maxReconnectDelay) + event.reconnectDelay=maxReconnectDelay; + } + + } else { + event.connectFailures = 0; + event.reconnectDelay = initialReconnectDelay; + } + + if( !running.get() ) + return; + + event.connectTime = System.currentTimeMillis(); + event.failed.set(false); + + listener.onServiceAdd(event); + } + }; + thread.setDaemon(true); + thread.start(); + } } + + public long getBackOffMultiplier() { + return backOffMultiplier; + } + + public void setBackOffMultiplier(long backOffMultiplier) { + this.backOffMultiplier = backOffMultiplier; + } + + public long getInitialReconnectDelay() { + return initialReconnectDelay; + } + + public void setInitialReconnectDelay(long initialReconnectDelay) { + this.initialReconnectDelay = initialReconnectDelay; + } + + public int getMaxReconnectAttempts() { + return maxReconnectAttempts; + } + + public void setMaxReconnectAttempts(int maxReconnectAttempts) { + this.maxReconnectAttempts = maxReconnectAttempts; + } + + public long getMaxReconnectDelay() { + return maxReconnectDelay; + } + + public void setMaxReconnectDelay(long maxReconnectDelay) { + this.maxReconnectDelay = maxReconnectDelay; + } + + public long getMinConnectTime() { + return minConnectTime; + } + + public void setMinConnectTime(long minConnectTime) { + this.minConnectTime = minConnectTime; + } + + public boolean isUseExponentialBackOff() { + return useExponentialBackOff; + } + + public void setUseExponentialBackOff(boolean useExponentialBackOff) { + this.useExponentialBackOff = useExponentialBackOff; + } }