mirror of https://github.com/apache/activemq.git
close local transport if remote transport fails (and supports failover), and re-establish local transport on a successful re-connect
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@397589 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b71a9f6a94
commit
a17b95182f
|
@ -57,6 +57,7 @@ import org.apache.activemq.util.ServiceStopper;
|
||||||
import org.apache.activemq.util.ServiceSupport;
|
import org.apache.activemq.util.ServiceSupport;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import com.sun.tools.javac.tree.Tree.DoLoop;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -128,17 +129,25 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
||||||
serviceRemoteException(error);
|
serviceRemoteException(error);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void transportInterupted(){
|
public synchronized void transportInterupted(){
|
||||||
//clear any subscriptions - to try and prevent the bridge from stalling the broker
|
//clear any subscriptions - to try and prevent the bridge from stalling the broker
|
||||||
log.warn("Outbound transport to " + remoteBrokerName + " interrupted ...");
|
log.warn("Outbound transport to " + remoteBrokerName + " interrupted ...");
|
||||||
clearDownSubscriptions();
|
clearDownSubscriptions();
|
||||||
|
doStopLocal();
|
||||||
|
startedLatch = new CountDownLatch(2);
|
||||||
|
try{
|
||||||
|
triggerLocalStartBridge();
|
||||||
|
}catch(IOException e){
|
||||||
|
log.warn("Caught exception from local start",e);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void transportResumed(){
|
public synchronized void transportResumed(){
|
||||||
//restart and static subscriptions - the consumer advisories will be replayed
|
//restart and static subscriptions - the consumer advisories will be replayed
|
||||||
log.info("Outbound transport to " + remoteBrokerName + " resumed");
|
log.info("Outbound transport to " + remoteBrokerName + " resumed");
|
||||||
setupStaticDestinations();
|
setupStaticDestinations();
|
||||||
|
startedLatch.countDown();
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -244,7 +253,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
||||||
if(!disposed){
|
if(!disposed){
|
||||||
try{
|
try{
|
||||||
disposed=true;
|
disposed=true;
|
||||||
localBridgeStarted.set(false);
|
|
||||||
remoteBridgeStarted.set(false);
|
remoteBridgeStarted.set(false);
|
||||||
if(!shutDown){
|
if(!shutDown){
|
||||||
remoteBroker.oneway(new ShutdownInfo());
|
remoteBroker.oneway(new ShutdownInfo());
|
||||||
|
@ -268,6 +277,23 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
|
||||||
log.debug(localBrokerName+ " bridge to " + remoteBrokerName + " stopped");
|
log.debug(localBrokerName+ " bridge to " + remoteBrokerName + " stopped");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void doStopLocal(){
|
||||||
|
try{
|
||||||
|
if(!shutDown){
|
||||||
|
if(localConnectionInfo!=null){
|
||||||
|
localBroker.oneway(localConnectionInfo.createRemoveCommand());
|
||||||
|
}
|
||||||
|
localBroker.oneway(new ShutdownInfo());
|
||||||
|
}
|
||||||
|
localBroker.setTransportListener(null);
|
||||||
|
}catch(IOException e){
|
||||||
|
log.debug("Caught exception stopping",e);
|
||||||
|
}finally{
|
||||||
|
ServiceStopper ss=new ServiceStopper();
|
||||||
|
ss.stop(localBroker);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void serviceRemoteException(Exception error) {
|
protected void serviceRemoteException(Exception error) {
|
||||||
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
|
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
|
||||||
ServiceSupport.dispose(this);
|
ServiceSupport.dispose(this);
|
||||||
|
|
Loading…
Reference in New Issue