Fix threading problems in the DemandForwardingBridge.java

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@374292 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Guillaume Nodet 2006-02-02 07:47:16 +00:00
parent d517eebfbc
commit 3a0358ee17
1 changed files with 14 additions and 3 deletions

View File

@ -16,7 +16,6 @@ package org.apache.activemq.network;
import java.io.IOException; import java.io.IOException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -49,6 +48,7 @@ import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/** /**
* Forwards messages from the local broker to the remote broker based on demand. * Forwards messages from the local broker to the remote broker based on demand.
@ -94,6 +94,7 @@ public class DemandForwardingBridge implements Bridge{
ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap(); ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
protected final BrokerId localBrokerPath[]=new BrokerId[] { null }; protected final BrokerId localBrokerPath[]=new BrokerId[] { null };
protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null }; protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null };
private CountDownLatch startedLatch = new CountDownLatch(2);
public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){ public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){
this.localBroker=localBroker; this.localBroker=localBroker;
@ -162,6 +163,7 @@ public class DemandForwardingBridge implements Bridge{
localBroker.oneway(localSessionInfo); localBroker.oneway(localSessionInfo);
log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
+") has been established."); +") has been established.");
startedLatch.countDown();
} }
} }
@ -186,6 +188,7 @@ public class DemandForwardingBridge implements Bridge{
+destinationFilter)); +destinationFilter));
demandConsumerInfo.setPrefetchSize(prefetchSize); demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo); remoteBroker.oneway(demandConsumerInfo);
startedLatch.countDown();
} }
} }
@ -214,7 +217,7 @@ public class DemandForwardingBridge implements Bridge{
} }
} }
protected void serviceRemoteException(IOException error){ protected void serviceRemoteException(Exception error){
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error); log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
ServiceSupport.dispose(this); ServiceSupport.dispose(this);
} }
@ -223,6 +226,7 @@ public class DemandForwardingBridge implements Bridge{
if(!disposed){ if(!disposed){
try{ try{
if(command.isMessageDispatch()){ if(command.isMessageDispatch()){
waitStarted();
MessageDispatch md=(MessageDispatch) command; MessageDispatch md=(MessageDispatch) command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
demandConsumerDispatched++; demandConsumerDispatched++;
@ -239,6 +243,7 @@ public class DemandForwardingBridge implements Bridge{
if(localBrokerId!=null){ if(localBrokerId!=null){
if(localBrokerId.equals(remoteBrokerId)){ if(localBrokerId.equals(remoteBrokerId)){
log.info("Disconnecting loop back connection."); log.info("Disconnecting loop back connection.");
waitStarted();
ServiceSupport.dispose(this); ServiceSupport.dispose(this);
}else{ }else{
triggerLocalStartBridge(); triggerLocalStartBridge();
@ -253,7 +258,7 @@ public class DemandForwardingBridge implements Bridge{
log.warn("Unexpected remote command: "+command); log.warn("Unexpected remote command: "+command);
} }
} }
}catch(IOException e){ }catch(Exception e){
serviceRemoteException(e); serviceRemoteException(e);
} }
} }
@ -343,6 +348,7 @@ public class DemandForwardingBridge implements Bridge{
final boolean trace=log.isTraceEnabled(); final boolean trace=log.isTraceEnabled();
try{ try{
if(command.isMessageDispatch()){ if(command.isMessageDispatch()){
waitStarted();
MessageDispatch md=(MessageDispatch) command; MessageDispatch md=(MessageDispatch) command;
Message message=md.getMessage(); Message message=md.getMessage();
DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId()); DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
@ -381,6 +387,7 @@ public class DemandForwardingBridge implements Bridge{
if(remoteBrokerId!=null){ if(remoteBrokerId!=null){
if(remoteBrokerId.equals(localBrokerId)){ if(remoteBrokerId.equals(localBrokerId)){
log.info("Disconnecting loop back connection."); log.info("Disconnecting loop back connection.");
waitStarted();
ServiceSupport.dispose(this); ServiceSupport.dispose(this);
} }
} }
@ -460,4 +467,8 @@ public class DemandForwardingBridge implements Bridge{
System.arraycopy(pathsToAppend,0,rc,brokerPath.length,pathsToAppend.length); System.arraycopy(pathsToAppend,0,rc,brokerPath.length,pathsToAppend.length);
return rc; return rc;
} }
private void waitStarted() throws InterruptedException {
startedLatch.await();
}
} }