git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@560696 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-07-29 08:57:31 +00:00
parent 50c6d9ccc8
commit 35ba427ec9
2 changed files with 274 additions and 229 deletions

View File

@ -15,6 +15,7 @@
package org.apache.activemq.broker;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -29,7 +30,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
@ -80,11 +80,13 @@ import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -1090,12 +1092,21 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config,props,"");
config.setBrokerName(broker.getBrokerName());
Transport localTransport = TransportFactory.connect(broker.getVmConnectorURI());
duplexBridge = NetworkBridgeFactory.createBridge(config,localTransport,transport);
URI uri = broker.getVmConnectorURI();
HashMap map = new HashMap(URISupport.parseParamters(uri));
map.put("network", "true");
map.put("async","false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
Transport localTransport = TransportFactory.connect(uri);
Transport remoteBridgeTransport = new ResponseCorrelator(transport);
duplexBridge = NetworkBridgeFactory.createBridge(config,localTransport,remoteBridgeTransport);
//now turn duplex off this side
info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true);
duplexBridge.start();
duplexBridge.duplexStart(brokerInfo,info);
log.info("Created Duplex Bridge back to " + info.getBrokerName());
return null;
}catch(Exception e){
log.error("Creating duplex network bridge",e);
}
@ -1103,7 +1114,6 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
// We only expect to get one broker info command per connection
if(this.brokerInfo!=null){
log.warn("Unexpected extra broker info command received: "+info);
Thread.dumpStack();
}
this.brokerInfo=info;
broker.addBroker(this,info);

View File

@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
@ -110,6 +111,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
final AtomicLong enqueueCounter = new AtomicLong();
final AtomicLong dequeueCounter = new AtomicLong();
private AtomicBoolean started = new AtomicBoolean();
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
this.configuration=configuration;
@ -117,8 +119,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
this.remoteBroker = remoteBroker;
}
public void duplexStart(BrokerInfo localBrokerInfo,BrokerInfo remoteBrokerInfo) throws Exception{
this.localBrokerInfo=localBrokerInfo;
this.remoteBrokerInfo=remoteBrokerInfo;
start();
serviceRemoteCommand(remoteBrokerInfo);
}
public void start() throws Exception{
if(started.compareAndSet(false,true)){
localBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Object o){
Command command=(Command)o;
serviceLocalCommand(command);
@ -129,6 +140,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
}
});
remoteBroker.setTransportListener(new TransportListener(){
public void onCommand(Object o){
Command command=(Command)o;
serviceRemoteCommand(command);
@ -141,9 +153,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
public void transportInterupted(){
// clear any subscriptions - to try and prevent the bridge from stalling the broker
if(remoteInterupted.compareAndSet(false,true)){
log.info("Outbound transport to "+remoteBrokerName+" interrupted.");
if(localBridgeStarted.get()){
clearDownSubscriptions();
synchronized(DemandForwardingBridgeSupport.this){
@ -151,28 +161,25 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
localBroker.oneway(localConnectionInfo.createRemoveCommand());
}catch(TransportDisposedIOException td){
log.debug("local broker is now disposed",td);
}
catch(IOException e){
}catch(IOException e){
log.warn("Caught exception from local start",e);
}
}
}
localBridgeStarted.set(false);
remoteBridgeStarted.set(false);
startedLatch=new CountDownLatch(2);
}
}
public void transportResumed(){
if(remoteInterupted.compareAndSet(true,false)){
// We want to slow down false connects so that we don't get in a busy loop.
// False connects can occurr if you using SSH tunnels.
if(!lastConnectSucceeded.get()){
try{
log.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
log
.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
Thread.sleep(1000);
}catch(InterruptedException e){
Thread.currentThread().interrupt();
@ -187,24 +194,22 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
}catch(Exception e){
log.error("Caught exception from local start in resume transport",e);
}
}
}
});
localBroker.start();
remoteBroker.start();
try{
triggerRemoteStartBridge();
}catch(IOException e){
log.warn("Caught exception from remote start",e);
}
NetworkBridgeListener l=this.networkBridgeListener;
if(l!=null){
l.onStart(this);
}
}
}
protected void triggerLocalStartBridge() throws IOException {
@ -261,25 +266,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
protected void startRemoteBridge() throws Exception{
if(remoteBridgeStarted.compareAndSet(false,true)){
synchronized(this){
if( remoteConnectionInfo!=null ) {
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
}
remoteConnectionInfo=new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
remoteConnectionInfo.setClientId("NC_"+configuration.getBrokerName()+"_outbound");
remoteConnectionInfo.setUserName(configuration.getUserName());
remoteConnectionInfo.setPassword(configuration.getPassword());
remoteBroker.oneway(remoteConnectionInfo);
if(isCreatedByDuplex()==false){
BrokerInfo brokerInfo=new BrokerInfo();
brokerInfo.setBrokerName(configuration.getBrokerName());
brokerInfo.setNetworkConnection(true);
brokerInfo.setDuplexConnection(configuration.isDuplex());
// set our properties
Properties props=new Properties();
IntrospectionSupport.getProperties(this,props,null);
@ -287,18 +279,26 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
brokerInfo.setNetworkProperties(str);
remoteBroker.oneway(brokerInfo);
}
if(remoteConnectionInfo!=null){
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
}
remoteConnectionInfo=new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
remoteConnectionInfo.setClientId("NC_"+configuration.getBrokerName()+"_outbound");
remoteConnectionInfo.setUserName(configuration.getUserName());
remoteConnectionInfo.setPassword(configuration.getPassword());
remoteBroker.oneway(remoteConnectionInfo);
SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
remoteBroker.oneway(remoteSessionInfo);
producerInfo=new ProducerInfo(remoteSessionInfo,1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
// Listen to consumer advisory messages on the remote broker to determine demand.
demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+configuration.getDestinationFilter();
String advisoryTopic=AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
+configuration.getDestinationFilter();
if(configuration.isBridgeTempDestinations()){
advisoryTopic+=","+AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
}
@ -306,24 +306,23 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
remoteBroker.oneway(demandConsumerInfo);
startedLatch.countDown();
if(!disposed){
triggerLocalStartBridge();
}
}
}
}
public void stop() throws Exception{
log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
if(started.compareAndSet(true,false)){
log.debug(" stopping "+configuration.getBrokerName()+" bridge to "+remoteBrokerName
+" is disposed already ? "+disposed);
boolean wasDisposedAlready=disposed;
if(!disposed){
NetworkBridgeListener l=this.networkBridgeListener;
if(l!=null){
l.onStop(this);
}
try{
disposed=true;
remoteBridgeStarted.set(false);
@ -335,11 +334,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
ServiceStopper ss=new ServiceStopper();
ss.stop(localBroker);
ss.stop(remoteBroker);
// Release the started Latch since another thread could be stuck waiting for it to start up.
startedLatch.countDown();
startedLatch.countDown();
ss.throwFirstException();
}
}
@ -349,6 +346,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
log.info(configuration.getBrokerName()+" bridge to "+remoteBrokerName+" stopped");
}
}
}
public void serviceRemoteException(Throwable error){
if(!disposed){
@ -383,17 +381,41 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
demandConsumerDispatched=0;
}
}else if(command.isBrokerInfo()){
lastConnectSucceeded.set(true);
remoteBrokerInfo=((BrokerInfo)command);
serviceRemoteBrokerInfo(command);
// Let the local broker know the remote broker's ID.
localBroker.oneway(command);
}else if(command.getClass()==ConnectionError.class){
ConnectionError ce=(ConnectionError)command;
serviceRemoteException(ce.getException());
}else{
if(configuration.isDuplex()||createdByDuplex){
if(command.isMessage()){
ActiveMQMessage message=(ActiveMQMessage)command;
if(AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())){
serviceRemoteConsumerAdvisory(message.getDataStructure());
}else{
localBroker.oneway(message);
}
}else{
switch(command.getDataStructureType()){
case ConnectionInfo.DATA_STRUCTURE_TYPE:
case SessionInfo.DATA_STRUCTURE_TYPE:
case ProducerInfo.DATA_STRUCTURE_TYPE:
localBroker.oneway(command);
break;
case ConsumerInfo.DATA_STRUCTURE_TYPE:
if(!addConsumerInfo((ConsumerInfo)command)){
if(log.isDebugEnabled())
log.debug("Ignoring ConsumerInfo: "+command);
}
break;
default:
if(log.isDebugEnabled())
log.debug("Ignoring remote command: "+command);
}
}
}else{
switch(command.getDataStructureType()){
case KeepAliveInfo.DATA_STRUCTURE_TYPE:
@ -404,14 +426,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
log.warn("Unexpected remote command: "+command);
}
}
}catch(Exception e){
}
}catch(Throwable e){
serviceRemoteException(e);
}
}
}
private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException{
final int networkTTL=configuration.getNetworkTTL();
if(data.getClass()==ConsumerInfo.class){
// Create a new local subscription
@ -419,35 +441,34 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
BrokerId[] path=info.getBrokerPath();
if((path!=null&&path.length>=networkTTL)){
if(log.isDebugEnabled())
log.debug(configuration.getBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
log.debug(configuration.getBrokerName()+" Ignoring Subscription "+info+" restricted to "+networkTTL
+" network hops only");
return;
}
if(contains(info.getBrokerPath(),localBrokerPath[0])){
// Ignore this consumer as it's a consumer we locally sent to the broker.
if(log.isDebugEnabled())
log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already routed through this broker once");
log.debug(configuration.getBrokerName()+" Ignoring sub "+info
+" already routed through this broker once");
return;
}
if(!isPermissableDestination(info.getDestination())){
// ignore if not in the permited or in the excluded list
if(log.isDebugEnabled())
log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
log.debug(configuration.getBrokerName()+" Ignoring sub "+info+" destination "+info.getDestination()
+" is not permiited");
return;
}
// Update the packet to show where it came from.
info=info.copy();
addRemoteBrokerToBrokerPath(info);
DemandSubscription sub=createDemandSubscription(info);
if (sub != null){
addSubscription(sub);
if(addConsumerInfo(info)){
if(log.isDebugEnabled())
log.debug(configuration.getBrokerName() + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info);
log.debug(configuration.getBrokerName()+" Forwarding sub on "+localBroker+" from "+remoteBrokerName
+" : "+info);
}else{
if(log.isDebugEnabled())
log.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination");
log.debug(configuration.getBrokerName()+" Ignoring sub "+info
+" already subscribed to matching destination");
}
}
else if (data.getClass()==DestinationInfo.class){
}else if(data.getClass()==DestinationInfo.class){
// It's a destination info - we want to pass up
// infomation about temporary destinations
DestinationInfo destInfo=(DestinationInfo)data;
@ -463,21 +484,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
log.debug("Ignoring sub "+destInfo+" already routed through this broker once");
return;
}
destInfo.setConnectionId(localConnectionInfo.getConnectionId());
if(destInfo.getDestination() instanceof ActiveMQTempDestination){
// re-set connection id so comes from here
ActiveMQTempDestination tempDest=(ActiveMQTempDestination)destInfo.getDestination();
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
}
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
log.debug("Replying destination control command: "+destInfo);
localBroker.oneway(destInfo);
}
else if(data.getClass()==RemoveInfo.class){
}else if(data.getClass()==RemoveInfo.class){
ConsumerId id=(ConsumerId)((RemoveInfo)data).getObjectId();
removeDemandSubscription(id);
}
@ -485,9 +501,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
public void serviceLocalException(Throwable error){
if(!disposed){
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "+error);
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "
+error);
log.debug("The local Exception was:"+error,error);
new Thread(){
public void run(){
ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
}
@ -742,7 +760,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
}
return false;
}
return true;
}
@ -767,6 +784,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
}
}
protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
boolean result = false;
ConsumerInfo info=consumerInfo.copy();
addRemoteBrokerToBrokerPath(info);
DemandSubscription sub=createDemandSubscription(info);
if (sub != null){
addSubscription(sub);
result = true;
}
return result;
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
return doCreateDemandSubscription(info);
}
@ -775,6 +804,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
DemandSubscription result=new DemandSubscription(info);
result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
.getNextSequenceId()));
if (info.getDestination().isTemporary()) {
//reset the local connection Id
ActiveMQTempDestination dest = (ActiveMQTempDestination)result.getLocalInfo().getDestination();
dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
}
if( configuration.isDecreaseNetworkConsumerPriority() ) {
byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;