remove await latch call if closing a loop back network connector - this

should 'fix' - hanging SpringTest.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@378967 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-02-19 22:13:54 +00:00
parent 8610db3e14
commit 97fff421f3
1 changed files with 71 additions and 15 deletions

View File

@ -42,6 +42,7 @@ import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
@ -93,6 +94,7 @@ public class DemandForwardingBridge implements Bridge{
protected CountDownLatch startedLatch = new CountDownLatch(2);
protected Object brokerInfoMutex = new Object();
protected boolean decreaseNetworkConsumerPriority;
protected boolean shutDown;
protected int networkTTL = 1;
@ -113,7 +115,7 @@ public class DemandForwardingBridge implements Bridge{
serviceLocalException(error);
}
});
remoteBroker.setTransportListener(new DefaultTransportListener(){
remoteBroker.setTransportListener(new TransportListener(){
public void onCommand(Command command){
serviceRemoteCommand(command);
}
@ -121,6 +123,20 @@ public class DemandForwardingBridge implements Bridge{
public void onException(IOException error){
serviceRemoteException(error);
}
public void transportInterupted(){
//clear any subscriptions - to try and prevent the bridge from stalling the broker
log.warn("Outbound transport to " + remoteBrokerName + " interrupted ...");
clearDownSubscriptions();
}
public void transportResumed(){
//restart and static subscriptions - the consumer advisories will be replayed
log.info("Outbound transport to " + remoteBrokerName + " resumed");
setupStaticDestinations();
}
});
localBroker.start();
remoteBroker.start();
@ -195,25 +211,32 @@ public class DemandForwardingBridge implements Bridge{
}
public void stop() throws Exception{
shutDown = true;
doStop();
}
/**
* stop the bridge
* @throws Exception
*/
public void stop() throws Exception{
protected void doStop() throws Exception{
log.debug(" stopping "+localBrokerName+ " bridge to " + remoteBrokerName + " is disposed already ? "+disposed);
if(!disposed){
try{
disposed=true;
localBridgeStarted.set(false);
remoteBridgeStarted.set(false);
if(!shutDown){
remoteBroker.oneway(new ShutdownInfo());
if(localConnectionInfo!=null){
localBroker.request(localConnectionInfo.createRemoveCommand());
remoteBroker.request(remoteConnectionInfo.createRemoveCommand());
localBroker.oneway(localConnectionInfo.createRemoveCommand());
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
}
localBroker.oneway(new ShutdownInfo());
}
localBroker.setTransportListener(null);
remoteBroker.setTransportListener(null);
remoteBroker.oneway(new ShutdownInfo());
localBroker.oneway(new ShutdownInfo());
}catch(IOException e){
log.debug("Caught exception stopping",e);
}finally{
@ -223,6 +246,7 @@ public class DemandForwardingBridge implements Bridge{
ss.throwFirstException();
}
}
log.debug(localBrokerName+ " bridge to " + remoteBrokerName + " stopped");
}
protected void serviceRemoteException(Exception error){
@ -251,7 +275,7 @@ public class DemandForwardingBridge implements Bridge{
if(localBrokerId!=null){
if(localBrokerId.equals(remoteBrokerId)){
log.info("Disconnecting loop back connection.");
waitStarted();
//waitStarted();
ServiceSupport.dispose(this);
}
}
@ -345,7 +369,6 @@ public class DemandForwardingBridge implements Bridge{
if(message.getOriginalTransactionId()==null)
message.setOriginalTransactionId(message.getTransactionId());
message.setTransactionId(null);
message.setRecievedByDFBridge(true);
message.evictMarshlledForm();
return message;
}
@ -393,8 +416,10 @@ public class DemandForwardingBridge implements Bridge{
}
}else if(command.isShutdownInfo()){
log.info(localBrokerName+" Shutting down");
disposed = true;
stop();
shutDown = true;
doStop();
}else{
switch(command.getDataStructureType()){
case WireFormatInfo.DATA_STRUCTURE_TYPE:
@ -568,6 +593,21 @@ public class DemandForwardingBridge implements Bridge{
this.networkTTL=networkTTL;
}
/**
* @return Returns the shutDown.
*/
public boolean isShutDown(){
return shutDown;
}
/**
* @param shutDown The shutDown to set.
*/
public void setShutDown(boolean shutDown){
this.shutDown=shutDown;
}
private boolean contains(BrokerId[] brokerPath,BrokerId brokerId){
if(brokerPath!=null){
@ -616,16 +656,19 @@ public class DemandForwardingBridge implements Bridge{
/**
* Subscriptions for these desitnations are always created
* @throws IOException
*
*/
protected void setupStaticDestinations() throws IOException{
protected void setupStaticDestinations(){
ActiveMQDestination[] dests = staticallyIncludedDestinations;
if (dests != null){
for(int i=0;i<dests.length;i++){
ActiveMQDestination dest=dests[i];
DemandSubscription sub = createDemandSubscription(dest);
try{
addSubscription(sub);
}catch(IOException e){
log.error("Failed to add static destination " + dest,e);
}
if(log.isTraceEnabled())
log.trace("Forwarding messages for static destination: " + dest);
}
@ -633,6 +676,10 @@ public class DemandForwardingBridge implements Bridge{
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info){
return doCreateDemandSubscription(info);
}
protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info){
DemandSubscription result=new DemandSubscription(info);
result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
.getNextSequenceId()));
@ -711,7 +758,11 @@ public class DemandForwardingBridge implements Bridge{
if(message.isAdvisory()&&message.getDataStructure()!=null
&&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
if(info.isNetworkSubscription()){
hops = info.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
if(hops >= networkTTL){
if (log.isTraceEnabled()){
log.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
}
return false;
}
}
@ -722,6 +773,11 @@ public class DemandForwardingBridge implements Bridge{
startedLatch.await();
}
protected void clearDownSubscriptions(){
}