git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@409742 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-05-26 20:44:54 +00:00
parent 10edc94224
commit 14fda24c87
2 changed files with 49 additions and 21 deletions

View File

@ -101,6 +101,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
protected boolean decreaseNetworkConsumerPriority; protected boolean decreaseNetworkConsumerPriority;
protected boolean shutDown; protected boolean shutDown;
protected int networkTTL = 1; protected int networkTTL = 1;
protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) { public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) {
@ -130,28 +131,49 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
public synchronized void transportInterupted(){ public synchronized void transportInterupted(){
//clear any subscriptions - to try and prevent the bridge from stalling the broker //clear any subscriptions - to try and prevent the bridge from stalling the broker
if( remoteInterupted.compareAndSet(false, true) ) {
log.warn("Outbound transport to " + remoteBrokerName + " interrupted ..."); log.warn("Outbound transport to " + remoteBrokerName + " interrupted ...");
clearDownSubscriptions(); clearDownSubscriptions();
doStopLocal();
startedLatch = new CountDownLatch(2);
try{ try{
triggerLocalStartBridge(); localBroker.oneway(remoteConnectionInfo.createRemoveCommand());
}catch(IOException e){ }catch(IOException e){
log.warn("Caught exception from local start",e); log.warn("Caught exception from local start",e);
} }
localBridgeStarted.set(false);
remoteBridgeStarted.set(false);
startedLatch = new CountDownLatch(2);
}
} }
public synchronized void transportResumed(){ public synchronized void transportResumed(){
if( remoteInterupted.compareAndSet(true, false) ) {
//restart and static subscriptions - the consumer advisories will be replayed //restart and static subscriptions - the consumer advisories will be replayed
log.info("Outbound transport to " + remoteBrokerName + " resumed"); log.info("Outbound transport to " + remoteBrokerName + " resumed");
setupStaticDestinations();
startedLatch.countDown(); // try{
// triggerLocalStartBridge();
// }catch(IOException e){
// log.warn("Caught exception from local start",e);
// }
try{
// clear out the previous connection as it may have missed some consumer advisories.
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
triggerRemoteStartBridge();
}catch(IOException e){
log.warn("Caught exception from remote start",e);
}
}
} }
}); });
localBroker.start(); localBroker.start();
remoteBroker.start(); remoteBroker.start();
// triggerLocalStartBridge();
triggerRemoteStartBridge(); triggerRemoteStartBridge();
} }
@ -160,7 +182,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
public void run(){ public void run(){
try{ try{
startLocalBridge(); startLocalBridge();
}catch(IOException e){ }catch(Exception e){
log.error("Failed to start network bridge: "+e,e); log.error("Failed to start network bridge: "+e,e);
} }
} }
@ -173,7 +195,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
public void run(){ public void run(){
try{ try{
startRemoteBridge(); startRemoteBridge();
}catch(IOException e){ }catch(Exception e){
log.error("Failed to start network bridge: "+e,e); log.error("Failed to start network bridge: "+e,e);
} }
} }
@ -181,8 +203,9 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
thead.start(); thead.start();
} }
protected void startLocalBridge() throws IOException { protected void startLocalBridge() throws Exception {
if(localBridgeStarted.compareAndSet(false,true)){ if(localBridgeStarted.compareAndSet(false,true)){
localConnectionInfo=new ConnectionInfo(); localConnectionInfo=new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId="NC_"+remoteBrokerName+"_inbound"+name; localClientId="NC_"+remoteBrokerName+"_inbound"+name;
@ -201,7 +224,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
} }
} }
protected void startRemoteBridge() throws IOException { protected void startRemoteBridge() throws Exception {
if(remoteBridgeStarted.compareAndSet(false,true)){ if(remoteBridgeStarted.compareAndSet(false,true)){
remoteConnectionInfo=new ConnectionInfo(); remoteConnectionInfo=new ConnectionInfo();
@ -229,7 +252,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
+destinationFilter)); +destinationFilter));
demandConsumerInfo.setPrefetchSize(prefetchSize); demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo); remoteBroker.oneway(demandConsumerInfo);
//we want infomation about Destinations as well //we want information about Destinations as well
ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2); ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2);
destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC); destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
destinationInfo.setPrefetchSize(prefetchSize); destinationInfo.setPrefetchSize(prefetchSize);
@ -290,6 +313,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}finally{ }finally{
ServiceStopper ss=new ServiceStopper(); ServiceStopper ss=new ServiceStopper();
ss.stop(localBroker); ss.stop(localBroker);
localBridgeStarted.set(false);
} }
} }
@ -489,9 +513,13 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
serviceLocalBrokerInfo(command); serviceLocalBrokerInfo(command);
}else if(command.isShutdownInfo()){ }else if(command.isShutdownInfo()){
log.info(localBrokerName+" Shutting down"); log.info(localBrokerName+" Shutting down");
// Don't shut down the whole connector if the remote side was interrupted.
// the local transport is just shutting down temporarily until the remote side
// is restored.
if( !remoteInterupted.get() ) {
shutDown = true; shutDown = true;
doStop(); doStop();
}
}else{ }else{
switch(command.getDataStructureType()){ switch(command.getDataStructureType()){

View File

@ -229,8 +229,8 @@ public class FailoverTransport implements CompositeTransport {
ServiceSupport.dispose(connectedTransport); ServiceSupport.dispose(connectedTransport);
connectedTransport = null; connectedTransport = null;
connectedTransportURI = null; connectedTransportURI = null;
reconnectTask.wakeup();
} }
reconnectTask.wakeup();
} }
} }