From 14fda24c872b7846667a062a358ba2ba8ff3174e Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Fri, 26 May 2006 20:44:54 +0000 Subject: [PATCH] Fix for: http://issues.apache.org/activemq/browse/AMQ-726 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@409742 13f79535-47bb-0310-9956-ffa450edef68 --- .../DemandForwardingBridgeSupport.java | 68 +++++++++++++------ .../transport/failover/FailoverTransport.java | 2 +- 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 552e0b4120..9a63c61e2f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -101,6 +101,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { protected boolean decreaseNetworkConsumerPriority; protected boolean shutDown; protected int networkTTL = 1; + protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false); public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) { @@ -130,28 +131,49 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { public synchronized void transportInterupted(){ //clear any subscriptions - to try and prevent the bridge from stalling the broker - log.warn("Outbound transport to " + remoteBrokerName + " interrupted ..."); - clearDownSubscriptions(); - doStopLocal(); - startedLatch = new CountDownLatch(2); - try{ - triggerLocalStartBridge(); - }catch(IOException e){ - log.warn("Caught exception from local start",e); + if( remoteInterupted.compareAndSet(false, true) ) { + log.warn("Outbound transport to " + remoteBrokerName + " interrupted ..."); + clearDownSubscriptions(); + try{ + localBroker.oneway(remoteConnectionInfo.createRemoveCommand()); + }catch(IOException e){ + log.warn("Caught exception from local start",e); + } + localBridgeStarted.set(false); + remoteBridgeStarted.set(false); + startedLatch = new CountDownLatch(2); } } public synchronized void transportResumed(){ - //restart and static subscriptions - the consumer advisories will be replayed - log.info("Outbound transport to " + remoteBrokerName + " resumed"); - setupStaticDestinations(); - startedLatch.countDown(); + + if( remoteInterupted.compareAndSet(true, false) ) { + + //restart and static subscriptions - the consumer advisories will be replayed + log.info("Outbound transport to " + remoteBrokerName + " resumed"); + +// try{ +// triggerLocalStartBridge(); +// }catch(IOException e){ +// log.warn("Caught exception from local start",e); +// } + + try{ + // clear out the previous connection as it may have missed some consumer advisories. + remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); + triggerRemoteStartBridge(); + }catch(IOException e){ + log.warn("Caught exception from remote start",e); + } + + } } }); localBroker.start(); remoteBroker.start(); +// triggerLocalStartBridge(); triggerRemoteStartBridge(); } @@ -160,7 +182,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { public void run(){ try{ startLocalBridge(); - }catch(IOException e){ + }catch(Exception e){ log.error("Failed to start network bridge: "+e,e); } } @@ -173,7 +195,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { public void run(){ try{ startRemoteBridge(); - }catch(IOException e){ + }catch(Exception e){ log.error("Failed to start network bridge: "+e,e); } } @@ -181,8 +203,9 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { thead.start(); } - protected void startLocalBridge() throws IOException { + protected void startLocalBridge() throws Exception { if(localBridgeStarted.compareAndSet(false,true)){ + localConnectionInfo=new ConnectionInfo(); localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); localClientId="NC_"+remoteBrokerName+"_inbound"+name; @@ -201,7 +224,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { } } - protected void startRemoteBridge() throws IOException { + protected void startRemoteBridge() throws Exception { if(remoteBridgeStarted.compareAndSet(false,true)){ remoteConnectionInfo=new ConnectionInfo(); @@ -229,7 +252,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { +destinationFilter)); demandConsumerInfo.setPrefetchSize(prefetchSize); remoteBroker.oneway(demandConsumerInfo); - //we want infomation about Destinations as well + //we want information about Destinations as well ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2); destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC); destinationInfo.setPrefetchSize(prefetchSize); @@ -290,6 +313,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { }finally{ ServiceStopper ss=new ServiceStopper(); ss.stop(localBroker); + localBridgeStarted.set(false); } } @@ -489,9 +513,13 @@ public abstract class DemandForwardingBridgeSupport implements Bridge { serviceLocalBrokerInfo(command); }else if(command.isShutdownInfo()){ log.info(localBrokerName+" Shutting down"); - shutDown = true; - doStop(); - + // Don't shut down the whole connector if the remote side was interrupted. + // the local transport is just shutting down temporarily until the remote side + // is restored. + if( !remoteInterupted.get() ) { + shutDown = true; + doStop(); + } }else{ switch(command.getDataStructureType()){ diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index adbdefebaa..b13c7be863 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -229,8 +229,8 @@ public class FailoverTransport implements CompositeTransport { ServiceSupport.dispose(connectedTransport); connectedTransport = null; connectedTransportURI = null; - reconnectTask.wakeup(); } + reconnectTask.wakeup(); } }