mirror of https://github.com/apache/activemq.git
Added reconnect logic.
http://issues.apache.org/activemq/browse/AMQ-803 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/branches/activemq-4.0@420717 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fd6ae201af
commit
4b3b6186a4
|
@ -23,6 +23,8 @@ import org.apache.activemq.command.DiscoveryEvent;
|
||||||
import org.apache.activemq.transport.discovery.DiscoveryAgent;
|
import org.apache.activemq.transport.discovery.DiscoveryAgent;
|
||||||
import org.apache.activemq.transport.discovery.DiscoveryListener;
|
import org.apache.activemq.transport.discovery.DiscoveryListener;
|
||||||
|
|
||||||
|
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A simple DiscoveryAgent that allows static configuration of the discovered services.
|
* A simple DiscoveryAgent that allows static configuration of the discovered services.
|
||||||
*
|
*
|
||||||
|
@ -30,9 +32,31 @@ import org.apache.activemq.transport.discovery.DiscoveryListener;
|
||||||
*/
|
*/
|
||||||
public class SimpleDiscoveryAgent implements DiscoveryAgent {
|
public class SimpleDiscoveryAgent implements DiscoveryAgent {
|
||||||
|
|
||||||
|
private long initialReconnectDelay = 1000*5;
|
||||||
|
private long maxReconnectDelay = 1000 * 30;
|
||||||
|
private long backOffMultiplier = 2;
|
||||||
|
private boolean useExponentialBackOff = false;
|
||||||
|
private int maxReconnectAttempts;
|
||||||
|
private final Object sleepMutex = new Object();
|
||||||
|
private long minConnectTime = 500;
|
||||||
|
|
||||||
private DiscoveryListener listener;
|
private DiscoveryListener listener;
|
||||||
String services[] = new String[] {};
|
String services[] = new String[] {};
|
||||||
String group = "DEFAULT";
|
String group = "DEFAULT";
|
||||||
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
class SimpleDiscoveryEvent extends DiscoveryEvent {
|
||||||
|
|
||||||
|
private int connectFailures;
|
||||||
|
private long reconnectDelay = initialReconnectDelay;
|
||||||
|
private long connectTime = System.currentTimeMillis();
|
||||||
|
private AtomicBoolean failed = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
public SimpleDiscoveryEvent(String service) {
|
||||||
|
super(service);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public void setDiscoveryListener(DiscoveryListener listener) {
|
public void setDiscoveryListener(DiscoveryListener listener) {
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
|
@ -42,12 +66,17 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() throws Exception {
|
public void start() throws Exception {
|
||||||
|
running.set(true);
|
||||||
for (int i = 0; i < services.length; i++) {
|
for (int i = 0; i < services.length; i++) {
|
||||||
listener.onServiceAdd(new DiscoveryEvent(services[i]));
|
listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
|
running.set(false);
|
||||||
|
synchronized(sleepMutex) {
|
||||||
|
sleepMutex.notifyAll();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String[] getServices() {
|
public String[] getServices() {
|
||||||
|
@ -80,7 +109,112 @@ public class SimpleDiscoveryAgent implements DiscoveryAgent {
|
||||||
public void setBrokerName(String brokerName) {
|
public void setBrokerName(String brokerName) {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void serviceFailed(DiscoveryEvent event) throws IOException {
|
public void serviceFailed(DiscoveryEvent devent) throws IOException {
|
||||||
|
|
||||||
|
final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent) devent;
|
||||||
|
if( event.failed.compareAndSet(false, true) ) {
|
||||||
|
|
||||||
|
listener.onServiceRemove(event);
|
||||||
|
Thread thread = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
|
||||||
|
|
||||||
|
// We detect a failed connection attempt because the service fails right
|
||||||
|
// away.
|
||||||
|
if( event.connectTime + minConnectTime > System.currentTimeMillis() ) {
|
||||||
|
|
||||||
|
event.connectFailures++;
|
||||||
|
|
||||||
|
if( maxReconnectAttempts>0 && event.connectFailures >= maxReconnectAttempts ) {
|
||||||
|
// Don' try to re-connect
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized(sleepMutex){
|
||||||
|
try{
|
||||||
|
if( !running.get() )
|
||||||
|
return;
|
||||||
|
|
||||||
|
sleepMutex.wait(event.reconnectDelay);
|
||||||
|
}catch(InterruptedException ie){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!useExponentialBackOff) {
|
||||||
|
event.reconnectDelay = initialReconnectDelay;
|
||||||
|
} else {
|
||||||
|
// Exponential increment of reconnect delay.
|
||||||
|
event.reconnectDelay*=backOffMultiplier;
|
||||||
|
if(event.reconnectDelay>maxReconnectDelay)
|
||||||
|
event.reconnectDelay=maxReconnectDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
event.connectFailures = 0;
|
||||||
|
event.reconnectDelay = initialReconnectDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
if( !running.get() )
|
||||||
|
return;
|
||||||
|
|
||||||
|
event.connectTime = System.currentTimeMillis();
|
||||||
|
event.failed.set(false);
|
||||||
|
|
||||||
|
listener.onServiceAdd(event);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
thread.setDaemon(true);
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getBackOffMultiplier() {
|
||||||
|
return backOffMultiplier;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBackOffMultiplier(long backOffMultiplier) {
|
||||||
|
this.backOffMultiplier = backOffMultiplier;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getInitialReconnectDelay() {
|
||||||
|
return initialReconnectDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setInitialReconnectDelay(long initialReconnectDelay) {
|
||||||
|
this.initialReconnectDelay = initialReconnectDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxReconnectAttempts() {
|
||||||
|
return maxReconnectAttempts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxReconnectAttempts(int maxReconnectAttempts) {
|
||||||
|
this.maxReconnectAttempts = maxReconnectAttempts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxReconnectDelay() {
|
||||||
|
return maxReconnectDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxReconnectDelay(long maxReconnectDelay) {
|
||||||
|
this.maxReconnectDelay = maxReconnectDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMinConnectTime() {
|
||||||
|
return minConnectTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMinConnectTime(long minConnectTime) {
|
||||||
|
this.minConnectTime = minConnectTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isUseExponentialBackOff() {
|
||||||
|
return useExponentialBackOff;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUseExponentialBackOff(boolean useExponentialBackOff) {
|
||||||
|
this.useExponentialBackOff = useExponentialBackOff;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue