diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 0b17a6c999..cf07f23b60 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -57,6 +57,7 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.sun.tools.javac.tree.Tree.DoLoop; import java.io.IOException; @@ -128,17 +129,25 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { serviceRemoteException(error); } - public void transportInterupted(){ + public synchronized void transportInterupted(){ //clear any subscriptions - to try and prevent the bridge from stalling the broker log.warn("Outbound transport to " + remoteBrokerName + " interrupted ..."); clearDownSubscriptions(); + doStopLocal(); + startedLatch = new CountDownLatch(2); + try{ + triggerLocalStartBridge(); + }catch(IOException e){ + log.warn("Caught exception from local start",e); + } } - public void transportResumed(){ + public synchronized void transportResumed(){ //restart and static subscriptions - the consumer advisories will be replayed log.info("Outbound transport to " + remoteBrokerName + " resumed"); setupStaticDestinations(); + startedLatch.countDown(); } }); @@ -244,7 +253,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { if(!disposed){ try{ disposed=true; - localBridgeStarted.set(false); + remoteBridgeStarted.set(false); if(!shutDown){ remoteBroker.oneway(new ShutdownInfo()); @@ -267,6 +276,23 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { } log.debug(localBrokerName+ " bridge to " + remoteBrokerName + " stopped"); } + + protected void doStopLocal(){ + try{ + if(!shutDown){ + if(localConnectionInfo!=null){ + localBroker.oneway(localConnectionInfo.createRemoveCommand()); + } + localBroker.oneway(new ShutdownInfo()); + } + localBroker.setTransportListener(null); + }catch(IOException e){ + log.debug("Caught exception stopping",e); + }finally{ + ServiceStopper ss=new ServiceStopper(); + ss.stop(localBroker); + } + } protected void serviceRemoteException(Exception error) { log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);