Hiram R. Chirino 2006-05-29 17:27:00 +00:00
parent 4a9418ec5a
commit 77e977ec57
2 changed files with 49 additions and 21 deletions

View File

@ -101,6 +101,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
protected boolean decreaseNetworkConsumerPriority;
protected boolean shutDown;
protected int networkTTL = 1;
protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) {
@ -130,28 +131,49 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
public synchronized void transportInterupted(){
//clear any subscriptions - to try and prevent the bridge from stalling the broker
if( remoteInterupted.compareAndSet(false, true) ) {
log.warn("Outbound transport to " + remoteBrokerName + " interrupted ...");
clearDownSubscriptions();
doStopLocal();
startedLatch = new CountDownLatch(2);
try{
triggerLocalStartBridge();
localBroker.oneway(remoteConnectionInfo.createRemoveCommand());
}catch(IOException e){
log.warn("Caught exception from local start",e);
}
localBridgeStarted.set(false);
remoteBridgeStarted.set(false);
startedLatch = new CountDownLatch(2);
}
}
public synchronized void transportResumed(){
if( remoteInterupted.compareAndSet(true, false) ) {
//restart and static subscriptions - the consumer advisories will be replayed
log.info("Outbound transport to " + remoteBrokerName + " resumed");
setupStaticDestinations();
startedLatch.countDown();
// try{
// triggerLocalStartBridge();
// }catch(IOException e){
// log.warn("Caught exception from local start",e);
// }
try{
// clear out the previous connection as it may have missed some consumer advisories.
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
triggerRemoteStartBridge();
}catch(IOException e){
log.warn("Caught exception from remote start",e);
}
}
}
});
localBroker.start();
remoteBroker.start();
// triggerLocalStartBridge();
triggerRemoteStartBridge();
}
@ -160,7 +182,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
public void run(){
try{
startLocalBridge();
}catch(IOException e){
}catch(Exception e){
log.error("Failed to start network bridge: "+e,e);
}
}
@ -173,7 +195,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
public void run(){
try{
startRemoteBridge();
}catch(IOException e){
}catch(Exception e){
log.error("Failed to start network bridge: "+e,e);
}
}
@ -181,8 +203,9 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
thead.start();
}
protected void startLocalBridge() throws IOException {
protected void startLocalBridge() throws Exception {
if(localBridgeStarted.compareAndSet(false,true)){
localConnectionInfo=new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId="NC_"+remoteBrokerName+"_inbound"+name;
@ -201,7 +224,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
}
protected void startRemoteBridge() throws IOException {
protected void startRemoteBridge() throws Exception {
if(remoteBridgeStarted.compareAndSet(false,true)){
remoteConnectionInfo=new ConnectionInfo();
@ -229,7 +252,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
+destinationFilter));
demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo);
//we want infomation about Destinations as well
//we want information about Destinations as well
ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2);
destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
destinationInfo.setPrefetchSize(prefetchSize);
@ -290,6 +313,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}finally{
ServiceStopper ss=new ServiceStopper();
ss.stop(localBroker);
localBridgeStarted.set(false);
}
}
@ -489,9 +513,13 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
serviceLocalBrokerInfo(command);
}else if(command.isShutdownInfo()){
log.info(localBrokerName+" Shutting down");
// Don't shut down the whole connector if the remote side was interrupted.
// the local transport is just shutting down temporarily until the remote side
// is restored.
if( !remoteInterupted.get() ) {
shutDown = true;
doStop();
}
}else{
switch(command.getDataStructureType()){

View File

@ -229,8 +229,8 @@ public class FailoverTransport implements CompositeTransport {
ServiceSupport.dispose(connectedTransport);
connectedTransport = null;
connectedTransportURI = null;
reconnectTask.wakeup();
}
reconnectTask.wakeup();
}
}