git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@517753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-03-13 16:19:58 +00:00
parent 1d9737d577
commit ae73f860b9
25 changed files with 644 additions and 500 deletions

View File

@ -69,6 +69,7 @@ import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.network.NetworkBridgeFactory;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConsumerState;
@ -80,6 +81,7 @@ import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceSupport;
@ -871,6 +873,9 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
if(masterBroker!=null){
masterBroker.stop();
}
if (duplexBridge != null) {
duplexBridge.stop();
}
// If the transport has not failed yet,
// notify the peer that we are doing a normal shutdown.
if(transportException==null){
@ -1081,9 +1086,10 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config,props,null);
config.setLocalBrokerName(broker.getBrokerName());
}catch(IOException e){
Transport localTransport = TransportFactory.connect(broker.getVmConnectorURI());
localTransport.start();
duplexBridge = NetworkBridgeFactory.createBridge(config,localTransport,transport);
}catch(Exception e){
log.error("Creating duplex network bridge",e);
}
}

View File

@ -39,6 +39,7 @@ public class BrokerInfo extends BaseCommand{
String brokerName;
long connectionId;
String brokerUploadUrl;
String networkProperties;
public boolean isBrokerInfo(){
@ -199,4 +200,21 @@ public class BrokerInfo extends BaseCommand{
public void setBrokerUploadUrl(String brokerUploadUrl) {
this.brokerUploadUrl = brokerUploadUrl;
}
/**
* @openwire:property version=3 cache=false
* @return the networkProperties
*/
public String getNetworkProperties(){
return this.networkProperties;
}
/**
* @param networkProperties the networkProperties to set
*/
public void setNetworkProperties(String networkProperties){
this.networkProperties=networkProperties;
}
}

View File

@ -41,8 +41,8 @@ public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSuppo
protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
protected Object brokerInfoMutex = new Object();
public CompositeDemandForwardingBridge(Transport localBroker, Transport remoteBroker) {
super(localBroker, remoteBroker);
public CompositeDemandForwardingBridge(NetworkBridgeConfiguration configuration,Transport localBroker, Transport remoteBroker) {
super(configuration,localBroker, remoteBroker);
remoteBrokerName = remoteBroker.toString();
remoteBrokerNameKnownLatch.countDown();
}
@ -102,7 +102,7 @@ public class CompositeDemandForwardingBridge extends DemandForwardingBridgeSuppo
}
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
return new NetworkBridgeFilter(getFromBrokerId(info), networkTTL);
return new NetworkBridgeFilter(getFromBrokerId(info), configuration.getNetworkTTL());
}
protected BrokerId[] getRemoteBrokerPath(){

View File

@ -42,8 +42,8 @@ public class ConduitBridge extends DemandForwardingBridge{
* @param localBroker
* @param remoteBroker
*/
public ConduitBridge(Transport localBroker,Transport remoteBroker){
super(localBroker,remoteBroker);
public ConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
super(configuration,localBroker,remoteBroker);
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException{

View File

@ -40,8 +40,8 @@ public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
protected Object brokerInfoMutex = new Object();
protected BrokerId remoteBrokerId;
public DemandForwardingBridge(Transport localBroker,Transport remoteBroker){
super(localBroker, remoteBroker);
public DemandForwardingBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
super(configuration,localBroker, remoteBroker);
}
protected void serviceRemoteBrokerInfo(Command command) throws IOException {
@ -80,7 +80,7 @@ public class DemandForwardingBridge extends DemandForwardingBridgeSupport {
}
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
return new NetworkBridgeFilter(remoteBrokerPath[0], networkTTL);
return new NetworkBridgeFilter(remoteBrokerPath[0], configuration.getNetworkTTL());
}
protected BrokerId[] getRemoteBrokerPath(){

View File

@ -53,13 +53,16 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.security.GeneralSecurityException;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@ -69,7 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
* @version $Revision$
*/
public abstract class DemandForwardingBridgeSupport implements Bridge {
public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
protected static final Log log = LogFactory.getLog(DemandForwardingBridge.class);
protected final Transport localBroker;
protected final Transport remoteBroker;
@ -79,15 +82,8 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
protected ConnectionInfo remoteConnectionInfo;
protected SessionInfo localSessionInfo;
protected ProducerInfo producerInfo;
protected String localBrokerName = "Unknown";
protected String remoteBrokerName = "Unknown";
protected String localClientId;
protected String userName;
protected String password;
protected int prefetchSize = 1000;
protected boolean dispatchAsync;
protected String destinationFilter = ">";
protected boolean bridgeTempDestinations = true;
protected String name = "bridge";
protected ConsumerInfo demandConsumerInfo;
protected int demandConsumerDispatched;
@ -104,14 +100,14 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
protected CountDownLatch startedLatch = new CountDownLatch(2);
protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
protected boolean decreaseNetworkConsumerPriority;
protected int networkTTL = 1;
protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected boolean duplex = false;
protected NetworkBridgeConfiguration configuration;
private NetworkBridgeFailedListener bridgeFailedListener;
public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) {
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
this.configuration=configuration;
this.localBroker = localBroker;
this.remoteBroker = remoteBroker;
}
@ -236,8 +232,8 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId="NC_"+remoteBrokerName+"_inbound"+name;
localConnectionInfo.setClientId(localClientId);
localConnectionInfo.setUserName(userName);
localConnectionInfo.setPassword(password);
localConnectionInfo.setUserName(configuration.getUserName());
localConnectionInfo.setPassword(configuration.getPassword());
localBroker.oneway(localConnectionInfo);
localSessionInfo=new SessionInfo(localConnectionInfo,1);
@ -263,15 +259,20 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
remoteConnectionInfo=new ConnectionInfo();
remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name);
remoteConnectionInfo.setUserName(userName);
remoteConnectionInfo.setPassword(password);
remoteConnectionInfo.setClientId("NC_"+configuration.getLocalBrokerName()+"_outbound"+name);
remoteConnectionInfo.setUserName(configuration.getUserName());
remoteConnectionInfo.setPassword(configuration.getPassword());
remoteBroker.oneway(remoteConnectionInfo);
BrokerInfo brokerInfo=new BrokerInfo();
brokerInfo.setBrokerName(localBrokerName);
brokerInfo.setBrokerName(configuration.getLocalBrokerName());
brokerInfo.setNetworkConnection(true);
brokerInfo.setDuplexConnection(isDuplex());
brokerInfo.setDuplexConnection(configuration.isDuplex());
//set our properties
Properties props = new Properties();
IntrospectionSupport.getProperties(this,props,null);
String str = MarshallingSupport.propertiesToString(props);
brokerInfo.setNetworkProperties(str);
remoteBroker.oneway(brokerInfo);
SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
@ -283,13 +284,13 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
// Listen to consumer advisory messages on the remote broker to determine demand.
demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
demandConsumerInfo.setDispatchAsync(dispatchAsync);
String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+destinationFilter;
if( bridgeTempDestinations ) {
demandConsumerInfo.setDispatchAsync(configuration.isDispatchAsync());
String advisoryTopic = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX+configuration.getDestinationFilter();
if( configuration.isBridgeTempDestinations() ) {
advisoryTopic += ","+AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
}
demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
demandConsumerInfo.setPrefetchSize(prefetchSize);
demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
remoteBroker.oneway(demandConsumerInfo);
startedLatch.countDown();
@ -302,7 +303,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
public void stop() throws Exception{
log.debug(" stopping "+localBrokerName+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
log.debug(" stopping "+configuration.getLocalBrokerName()+" bridge to "+remoteBrokerName+" is disposed already ? "+disposed);
boolean wasDisposedAlready=disposed;
if(!disposed){
try{
@ -320,13 +321,13 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
}
if(wasDisposedAlready){
log.debug(localBrokerName+" bridge to "+remoteBrokerName+" stopped");
log.debug(configuration.getLocalBrokerName()+" bridge to "+remoteBrokerName+" stopped");
}else{
log.info(localBrokerName+" bridge to "+remoteBrokerName+" stopped");
log.info(configuration.getLocalBrokerName()+" bridge to "+remoteBrokerName+" stopped");
}
}
protected void serviceRemoteException(Throwable error){
public void serviceRemoteException(Throwable error){
if(!disposed){
if(error instanceof SecurityException||error instanceof GeneralSecurityException){
log.error("Network connection between "+localBroker+" and "+remoteBroker
@ -342,6 +343,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
}
}.start();
fireBridgeFailed();
}
}
@ -384,25 +386,26 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
final int networkTTL = configuration.getNetworkTTL();
if(data.getClass()==ConsumerInfo.class){
// Create a new local subscription
ConsumerInfo info=(ConsumerInfo) data;
BrokerId[] path=info.getBrokerPath();
if((path!=null&&path.length>= networkTTL)){
if(log.isDebugEnabled())
log.debug(localBrokerName + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
log.debug(configuration.getLocalBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
return;
}
if(contains(info.getBrokerPath(),localBrokerPath[0])){
// Ignore this consumer as it's a consumer we locally sent to the broker.
if(log.isDebugEnabled())
log.debug(localBrokerName + " Ignoring sub " + info + " already routed through this broker once");
log.debug(configuration.getLocalBrokerName() + " Ignoring sub " + info + " already routed through this broker once");
return;
}
if (!isPermissableDestination(info.getDestination())){
//ignore if not in the permited or in the excluded list
if(log.isDebugEnabled())
log.debug(localBrokerName + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
log.debug(configuration.getLocalBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
return;
}
// Update the packet to show where it came from.
@ -412,10 +415,10 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
if (sub != null){
addSubscription(sub);
if(log.isDebugEnabled())
log.debug(localBrokerName + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info);
log.debug(configuration.getLocalBrokerName() + " Forwarding sub on "+localBroker+" from "+remoteBrokerName+" : "+info);
}else {
if(log.isDebugEnabled())
log.debug(localBrokerName + " Ignoring sub " + info + " already subscribed to matching destination");
log.debug(configuration.getLocalBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination");
}
}
else if (data.getClass()==DestinationInfo.class){
@ -454,7 +457,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
}
protected void serviceLocalException(Throwable error) {
public void serviceLocalException(Throwable error) {
if( !disposed ) {
log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "+error);
log.debug("The local Exception was:"+error,error);
@ -463,6 +466,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
}
}.start();
fireBridgeFailed();
}
}
@ -507,7 +511,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
if(sub!=null){
Message message= configureMessage(md);
if(trace)
log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message);
log.trace("bridging "+configuration.getLocalBrokerName()+" -> "+remoteBrokerName+": "+message);
@ -547,7 +551,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}else if(command.isBrokerInfo()){
serviceLocalBrokerInfo(command);
}else if(command.isShutdownInfo()){
log.info(localBrokerName+" Shutting down");
log.info(configuration.getLocalBrokerName()+" Shutting down");
// Don't shut down the whole connector if the remote side was interrupted.
// the local transport is just shutting down temporarily until the remote side
// is restored.
@ -571,34 +575,6 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
}
/**
* @return prefetch size
*/
public int getPrefetchSize() {
return prefetchSize;
}
/**
* @param prefetchSize
*/
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize=prefetchSize;
}
/**
* @return true if dispatch async
*/
public boolean isDispatchAsync() {
return dispatchAsync;
}
/**
* @param dispatchAsync
*/
public void setDispatchAsync(boolean dispatchAsync) {
this.dispatchAsync=dispatchAsync;
}
/**
* @return Returns the dynamicallyIncludedDestinations.
*/
@ -655,21 +631,6 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
this.durableDestinations=durableDestinations;
}
/**
* @return Returns the localBrokerName.
*/
public String getLocalBrokerName() {
return localBrokerName;
}
/**
* @param localBrokerName
* The localBrokerName to set.
*/
public void setLocalBrokerName(String localBrokerName) {
this.localBrokerName=localBrokerName;
}
/**
* @return Returns the localBroker.
*/
@ -697,34 +658,6 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
public void setName(String name) {
this.name=name;
}
/**
* @return Returns the decreaseNetworkConsumerPriority.
*/
public boolean isDecreaseNetworkConsumerPriority() {
return decreaseNetworkConsumerPriority;
}
/**
* @param decreaseNetworkConsumerPriority The decreaseNetworkConsumerPriority to set.
*/
public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority;
}
/**
* @return Returns the networkTTL.
*/
public int getNetworkTTL() {
return networkTTL;
}
/**
* @param networkTTL The networkTTL to set.
*/
public void setNetworkTTL(int networkTTL) {
this.networkTTL=networkTTL;
}
public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
if(brokerPath!=null){
@ -757,7 +690,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
protected boolean isPermissableDestination(ActiveMQDestination destination) {
// Are we not bridging temp destinations?
if( destination.isTemporary() && !bridgeTempDestinations )
if( destination.isTemporary() && !configuration.isBridgeTempDestinations() )
return false;
DestinationFilter filter=DestinationFilter.parseFilter(destination);
@ -814,7 +747,7 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
.getNextSequenceId()));
if( decreaseNetworkConsumerPriority ) {
if( configuration.isDecreaseNetworkConsumerPriority() ) {
byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
// The longer the path to the consumer, the less it's consumer priority.
@ -840,8 +773,8 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
sub.getLocalInfo().setDispatchAsync(dispatchAsync);
sub.getLocalInfo().setPrefetchSize(prefetchSize);
sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(),sub);
subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(),sub);
@ -877,37 +810,16 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
protected abstract BrokerId[] getRemoteBrokerPath();
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public boolean isBridgeTempDestinations() {
return bridgeTempDestinations;
}
public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
this.bridgeTempDestinations = bridgeTempDestinations;
}
public boolean isDuplex(){
return this.duplex;
}
public void setDuplex(boolean duplex){
this.duplex=duplex;
}
public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
this.bridgeFailedListener=listener;
}
private void fireBridgeFailed() {
NetworkBridgeFailedListener l = this.bridgeFailedListener;
if (l!=null) {
l.bridgeFailed();
}
}
}

View File

@ -60,7 +60,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
public void onServiceAdd(DiscoveryEvent event) {
// Ignore events once we start stopping.
if( isStopped() || isStopping() )
if( serviceSupport.isStopped() || serviceSupport.isStopping() )
return;
String url = event.getServiceName();
@ -106,7 +106,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return;
}
Bridge bridge = createBridge(localTransport, remoteTransport, event);
NetworkBridge bridge = createBridge(localTransport, remoteTransport, event);
bridges.put(uri, bridge);
try {
bridge.start();
@ -138,7 +138,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return;
}
Bridge bridge = (Bridge) bridges.remove(uri);
NetworkBridge bridge = (NetworkBridge) bridges.remove(uri);
if (bridge == null)
return;
@ -158,17 +158,17 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
}
}
protected void doStart() throws Exception {
protected void handleStart() throws Exception {
if (discoveryAgent == null) {
throw new IllegalStateException("You must configure the 'discoveryAgent' property");
}
this.discoveryAgent.start();
super.doStart();
super.handleStart();
}
protected void doStop(ServiceStopper stopper) throws Exception {
protected void handleStop(ServiceStopper stopper) throws Exception {
for (Iterator i = bridges.values().iterator(); i.hasNext();) {
Bridge bridge = (Bridge) i.next();
NetworkBridge bridge = (NetworkBridge) i.next();
try {
bridge.stop();
}
@ -183,91 +183,24 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
stopper.onException(this, e);
}
super.doStop(stopper);
super.handleStop(stopper);
}
protected Bridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
DemandForwardingBridge result = null;
if (conduitSubscriptions) {
if (dynamicOnly) {
result = new ConduitBridge(localTransport, remoteTransport) {
protected void serviceLocalException(Throwable error) {
try {
super.serviceLocalException(error);
} finally {
fireServiceFailed();
}
}
protected void serviceRemoteException(Throwable error) {
try {
super.serviceRemoteException(error);
} finally {
fireServiceFailed();
}
}
public void fireServiceFailed() {
if( !isStopped() ) {
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
}
};
protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
NetworkBridgeFailedListener listener = new NetworkBridgeFailedListener() {
public void bridgeFailed(){
if( !serviceSupport.isStopped() ) {
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
}
else {
result = new DurableConduitBridge(localTransport, remoteTransport) {
protected void serviceLocalException(Throwable error) {
try {
super.serviceLocalException(error);
} finally {
fireServiceFailed();
}
}
protected void serviceRemoteException(Throwable error) {
try {
super.serviceRemoteException(error);
} finally {
fireServiceFailed();
}
}
public void fireServiceFailed() {
if( !isStopped() ) {
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
}
};
}
}
else {
result = new DemandForwardingBridge(localTransport, remoteTransport) {
protected void serviceLocalException(Throwable error) {
try {
super.serviceLocalException(error);
} finally {
fireServiceFailed();
}
}
protected void serviceRemoteException(Throwable error) {
try {
super.serviceRemoteException(error);
} finally {
fireServiceFailed();
}
}
public void fireServiceFailed() {
if( !isStopped() ) {
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
}
};
}
};
DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this,localTransport,remoteTransport,listener);
return configureBridge(result);
}
@ -275,4 +208,6 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return discoveryAgent.toString();
}
}

View File

@ -37,12 +37,13 @@ public class DurableConduitBridge extends ConduitBridge{
/**
* Constructor
* @param configuration
*
* @param localBroker
* @param remoteBroker
*/
public DurableConduitBridge(Transport localBroker,Transport remoteBroker){
super(localBroker,remoteBroker);
public DurableConduitBridge(NetworkBridgeConfiguration configuration,Transport localBroker,Transport remoteBroker){
super(configuration,localBroker,remoteBroker);
}
/**
@ -92,7 +93,7 @@ public class DurableConduitBridge extends ConduitBridge{
}
protected String getSubscriberName(ActiveMQDestination dest){
String subscriberName = getLocalBrokerName()+"_"+dest.getPhysicalName();
String subscriberName = configuration.getLocalBrokerName()+"_"+dest.getPhysicalName();
return subscriberName;
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.network;
import java.io.IOException;
import org.apache.activemq.Service;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
@ -52,7 +53,7 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision$
*/
public class ForwardingBridge implements Bridge {
public class ForwardingBridge implements Service{
static final private Log log = LogFactory.getLog(ForwardingBridge.class);
@ -76,6 +77,7 @@ public class ForwardingBridge implements Bridge {
BrokerId localBrokerId;
BrokerId remoteBrokerId;
private NetworkBridgeFailedListener bridgeFailedListener;
public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
this.localBroker = localBroker;
@ -179,7 +181,7 @@ public class ForwardingBridge implements Bridge {
}
}
protected void serviceRemoteException(IOException error) {
public void serviceRemoteException(Throwable error) {
log.info("Unexpected remote exception: "+error);
log.debug("Exception trace: ", error);
}
@ -206,9 +208,10 @@ public class ForwardingBridge implements Bridge {
}
}
protected void serviceLocalException(Throwable error) {
public void serviceLocalException(Throwable error) {
log.info("Unexpected local exception: "+error);
log.debug("Exception trace: ", error);
fireBridgeFailed();
}
protected void serviceLocalCommand(Command command) {
try {
@ -319,4 +322,16 @@ public class ForwardingBridge implements Bridge {
public void setDestinationFilter(String destinationFilter) {
this.destinationFilter = destinationFilter;
}
public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener){
this.bridgeFailedListener=listener;
}
private void fireBridgeFailed() {
NetworkBridgeFailedListener l = this.bridgeFailedListener;
if (l!=null) {
l.bridgeFailed();
}
}
}

View File

@ -94,7 +94,7 @@ public class MulticastNetworkConnector extends NetworkConnector {
// Implementation methods
// -------------------------------------------------------------------------
protected void doStart() throws Exception {
protected void handleStart() throws Exception {
if (remoteTransport == null) {
if (remoteURI == null) {
throw new IllegalArgumentException("You must specify the remoteURI property");
@ -114,11 +114,11 @@ public class MulticastNetworkConnector extends NetworkConnector {
remoteTransport.start();
localTransport.start();
super.doStart();
super.handleStart();
}
protected void doStop(ServiceStopper stopper) throws Exception {
super.doStop(stopper);
protected void handleStop(ServiceStopper stopper) throws Exception {
super.handleStop(stopper);
if (bridge != null) {
try {
bridge.stop();
@ -150,7 +150,7 @@ public class MulticastNetworkConnector extends NetworkConnector {
}
protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) {
return new CompositeDemandForwardingBridge(local, remote);
return new CompositeDemandForwardingBridge(this,local, remote);
}
}

View File

@ -0,0 +1,47 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import org.apache.activemq.Service;
/**
* Represents a network bridge interface
*
* @version $Revision: 1.1 $
*/
public interface NetworkBridge extends Service {
/**
* Service an exception
* @param error
*/
public void serviceRemoteException(Throwable error);
/**
* servicee an exception
* @param error
*/
public void serviceLocalException(Throwable error);
/**
* Set the NetworkBridgeFailedListener
* @param listener
*/
public void setNetworkBridgeFailedListener(NetworkBridgeFailedListener listener);
}

View File

@ -0,0 +1,224 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.network;
/**
* Configuration for a NetworkBridge
*
* @version $Revision: 1.1 $
*/
public class NetworkBridgeConfiguration{
private boolean conduitSubscriptions=true;
private boolean dynamicOnly=false;
private boolean dispatchAsync=true;
private boolean decreaseNetworkConsumerPriority=false;
private boolean duplex=false;
private boolean bridgeTempDestinations=true;
private int prefetchSize=1000;
private int networkTTL=1;
private String localBrokerName="Unknow";
private String userName;
private String password;
private String destinationFilter = ">";
/**
* @return the conduitSubscriptions
*/
public boolean isConduitSubscriptions(){
return this.conduitSubscriptions;
}
/**
* @param conduitSubscriptions the conduitSubscriptions to set
*/
public void setConduitSubscriptions(boolean conduitSubscriptions){
this.conduitSubscriptions=conduitSubscriptions;
}
/**
* @return the dynamicOnly
*/
public boolean isDynamicOnly(){
return this.dynamicOnly;
}
/**
* @param dynamicOnly the dynamicOnly to set
*/
public void setDynamicOnly(boolean dynamicOnly){
this.dynamicOnly=dynamicOnly;
}
/**
* @return the bridgeTempDestinations
*/
public boolean isBridgeTempDestinations(){
return this.bridgeTempDestinations;
}
/**
* @param bridgeTempDestinations the bridgeTempDestinations to set
*/
public void setBridgeTempDestinations(boolean bridgeTempDestinations){
this.bridgeTempDestinations=bridgeTempDestinations;
}
/**
* @return the decreaseNetworkConsumerPriority
*/
public boolean isDecreaseNetworkConsumerPriority(){
return this.decreaseNetworkConsumerPriority;
}
/**
* @param decreaseNetworkConsumerPriority the decreaseNetworkConsumerPriority to set
*/
public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority){
this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority;
}
/**
* @return the dispatchAsync
*/
public boolean isDispatchAsync(){
return this.dispatchAsync;
}
/**
* @param dispatchAsync the dispatchAsync to set
*/
public void setDispatchAsync(boolean dispatchAsync){
this.dispatchAsync=dispatchAsync;
}
/**
* @return the duplex
*/
public boolean isDuplex(){
return this.duplex;
}
/**
* @param duplex the duplex to set
*/
public void setDuplex(boolean duplex){
this.duplex=duplex;
}
/**
* @return the localBrokerName
*/
public String getLocalBrokerName(){
return this.localBrokerName;
}
/**
* @param localBrokerName the localBrokerName to set
*/
public void setLocalBrokerName(String localBrokerName){
this.localBrokerName=localBrokerName;
}
/**
* @return the networkTTL
*/
public int getNetworkTTL(){
return this.networkTTL;
}
/**
* @param networkTTL the networkTTL to set
*/
public void setNetworkTTL(int networkTTL){
this.networkTTL=networkTTL;
}
/**
* @return the password
*/
public String getPassword(){
return this.password;
}
/**
* @param password the password to set
*/
public void setPassword(String password){
this.password=password;
}
/**
* @return the prefetchSize
*/
public int getPrefetchSize(){
return this.prefetchSize;
}
/**
* @param prefetchSize the prefetchSize to set
*/
public void setPrefetchSize(int prefetchSize){
this.prefetchSize=prefetchSize;
}
/**
* @return the userName
*/
public String getUserName(){
return this.userName;
}
/**
* @param userName the userName to set
*/
public void setUserName(String userName){
this.userName=userName;
}
/**
* @return the destinationFilter
*/
public String getDestinationFilter(){
return this.destinationFilter;
}
/**
* @param destinationFilter the destinationFilter to set
*/
public void setDestinationFilter(String destinationFilter){
this.destinationFilter=destinationFilter;
}
}

View File

@ -0,0 +1,65 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.network;
import org.apache.activemq.transport.Transport;
/**
* Factory for network bridges
*
* @version $Revision: 1.1 $
*/
public class NetworkBridgeFactory{
/**
* Create a network bridge
*
* @param config
* @param localTransport
* @param remoteTransport
* @return the NetworkBridge
*/
public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration config,Transport localTransport,
Transport remoteTransport){
return createBridge(config,localTransport,remoteTransport,null);
}
/**
* create a network bridge
*
* @param configuration
* @param localTransport
* @param remoteTransport
* @param listener
* @return the NetworkBridge
*/
public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,Transport localTransport,
Transport remoteTransport,NetworkBridgeFailedListener listener){
DemandForwardingBridge result=null;
if(configuration.isConduitSubscriptions()){
if(configuration.isDynamicOnly()){
result=new ConduitBridge(configuration,localTransport,remoteTransport);
}else{
result=new DurableConduitBridge(configuration,localTransport,remoteTransport);
}
}else{
result=new DemandForwardingBridge(configuration,localTransport,remoteTransport);
}
if(listener!=null){
result.setNetworkBridgeFailedListener(listener);
}
return result;
}
}

View File

@ -17,14 +17,18 @@
*/
package org.apache.activemq.network;
import org.apache.activemq.Service;
/**
* Represents a network bridge interface
*called when a bridge fails
*
* @version $Revision: 1.1 $
*/
public interface Bridge extends Service {
public interface NetworkBridgeFailedListener{
/**
* called when the transport fails
*
*/
public void bridgeFailed();
}

View File

@ -1,27 +1,26 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.network;
import static org.apache.activemq.network.NetworkConnector.log;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.Service;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
@ -30,242 +29,167 @@ import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @version $Revision$
*/
public abstract class NetworkConnector extends ServiceSupport {
public abstract class NetworkConnector extends NetworkBridgeConfiguration implements Service{
protected static final Log log = LogFactory.getLog(NetworkConnector.class);
protected static final Log log=LogFactory.getLog(NetworkConnector.class);
protected URI localURI;
private String brokerName = "localhost";
private String brokerName="localhost";
private Set durableDestinations;
private List excludedDestinations = new CopyOnWriteArrayList();
private List dynamicallyIncludedDestinations = new CopyOnWriteArrayList();
private List staticallyIncludedDestinations = new CopyOnWriteArrayList();
protected boolean dynamicOnly = false;
protected boolean conduitSubscriptions = true;
private boolean decreaseNetworkConsumerPriority;
private int networkTTL = 1;
private String name = "bridge";
private int prefetchSize = 1000;
private boolean dispatchAsync = true;
private String userName;
private String password;
private boolean bridgeTempDestinations=true;
private boolean duplex = false;
private List excludedDestinations=new CopyOnWriteArrayList();
private List dynamicallyIncludedDestinations=new CopyOnWriteArrayList();
private List staticallyIncludedDestinations=new CopyOnWriteArrayList();
private String name="bridge";
protected ConnectionFilter connectionFilter;
protected ServiceSupport serviceSupport=new ServiceSupport(){
public NetworkConnector() {
protected void doStart() throws Exception{
handleStart();
}
protected void doStop(ServiceStopper stopper) throws Exception{
handleStop(stopper);
}
};
public NetworkConnector(){
}
public NetworkConnector(URI localURI) {
this.localURI = localURI;
public NetworkConnector(URI localURI){
this.localURI=localURI;
}
public URI getLocalUri() throws URISyntaxException {
public URI getLocalUri() throws URISyntaxException{
return localURI;
}
public void setLocalUri(URI localURI) {
this.localURI = localURI;
public void setLocalUri(URI localURI){
this.localURI=localURI;
}
/**
* @return Returns the name.
*/
public String getName() {
if (name == null) {
name = createName();
public String getName(){
if(name==null){
name=createName();
}
return name;
}
/**
* @param name
* The name to set.
* @param name The name to set.
*/
public void setName(String name) {
this.name = name;
public void setName(String name){
this.name=name;
}
public String getBrokerName() {
public String getBrokerName(){
return brokerName;
}
/**
* @param brokerName
* The brokerName to set.
* @param brokerName The brokerName to set.
*/
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
public void setBrokerName(String brokerName){
this.brokerName=brokerName;
}
/**
* @return Returns the durableDestinations.
*/
public Set getDurableDestinations() {
public Set getDurableDestinations(){
return durableDestinations;
}
/**
* @param durableDestinations
* The durableDestinations to set.
* @param durableDestinations The durableDestinations to set.
*/
public void setDurableDestinations(Set durableDestinations) {
this.durableDestinations = durableDestinations;
}
/**
* @return Returns the dynamicOnly.
*/
public boolean isDynamicOnly() {
return dynamicOnly;
}
/**
* @param dynamicOnly
* The dynamicOnly to set.
*/
public void setDynamicOnly(boolean dynamicOnly) {
this.dynamicOnly = dynamicOnly;
}
/**
* @return Returns the conduitSubscriptions.
*/
public boolean isConduitSubscriptions() {
return conduitSubscriptions;
}
/**
* @param conduitSubscriptions
* The conduitSubscriptions to set.
*/
public void setConduitSubscriptions(boolean conduitSubscriptions) {
this.conduitSubscriptions = conduitSubscriptions;
}
/**
* @return Returns the decreaseNetworkConsumerPriority.
*/
public boolean isDecreaseNetworkConsumerPriority() {
return decreaseNetworkConsumerPriority;
}
/**
* @param decreaseNetworkConsumerPriority
* The decreaseNetworkConsumerPriority to set.
*/
public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
this.decreaseNetworkConsumerPriority = decreaseNetworkConsumerPriority;
}
/**
* @return Returns the networkTTL.
*/
public int getNetworkTTL() {
return networkTTL;
}
/**
* @param networkTTL
* The networkTTL to set.
*/
public void setNetworkTTL(int networkTTL) {
this.networkTTL = networkTTL;
public void setDurableDestinations(Set durableDestinations){
this.durableDestinations=durableDestinations;
}
/**
* @return Returns the excludedDestinations.
*/
public List getExcludedDestinations() {
public List getExcludedDestinations(){
return excludedDestinations;
}
/**
* @param excludedDestinations
* The excludedDestinations to set.
* @param excludedDestinations The excludedDestinations to set.
*/
public void setExcludedDestinations(List exludedDestinations) {
this.excludedDestinations = exludedDestinations;
public void setExcludedDestinations(List exludedDestinations){
this.excludedDestinations=exludedDestinations;
}
public void addExcludedDestination(ActiveMQDestination destiantion) {
public void addExcludedDestination(ActiveMQDestination destiantion){
this.excludedDestinations.add(destiantion);
}
/**
* @return Returns the staticallyIncludedDestinations.
*/
public List getStaticallyIncludedDestinations() {
public List getStaticallyIncludedDestinations(){
return staticallyIncludedDestinations;
}
/**
* @param staticallyIncludedDestinations
* The staticallyIncludedDestinations to set.
* @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
*/
public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations) {
this.staticallyIncludedDestinations = staticallyIncludedDestinations;
public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations){
this.staticallyIncludedDestinations=staticallyIncludedDestinations;
}
public void addStaticallyIncludedDestination(ActiveMQDestination destiantion) {
public void addStaticallyIncludedDestination(ActiveMQDestination destiantion){
this.staticallyIncludedDestinations.add(destiantion);
}
/**
* @return Returns the dynamicallyIncludedDestinations.
*/
public List getDynamicallyIncludedDestinations() {
public List getDynamicallyIncludedDestinations(){
return dynamicallyIncludedDestinations;
}
/**
* @param dynamicallyIncludedDestinations
* The dynamicallyIncludedDestinations to set.
* @param dynamicallyIncludedDestinations The dynamicallyIncludedDestinations to set.
*/
public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations) {
this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations){
this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations;
}
public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion) {
public void addDynamicallyIncludedDestination(ActiveMQDestination destiantion){
this.dynamicallyIncludedDestinations.add(destiantion);
}
public ConnectionFilter getConnectionFilter(){
return connectionFilter;
}
public void setConnectionFilter(ConnectionFilter connectionFilter){
this.connectionFilter=connectionFilter;
}
// Implementation methods
// -------------------------------------------------------------------------
protected Bridge configureBridge(DemandForwardingBridgeSupport result) {
result.setLocalBrokerName(getBrokerName());
protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result){
result.setName(getBrokerName());
result.setNetworkTTL(getNetworkTTL());
result.setUserName(userName);
result.setPassword(password);
result.setPrefetchSize(prefetchSize);
result.setDispatchAsync(dispatchAsync);
result.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
result.setDuplex(isDuplex());
List destsList = getDynamicallyIncludedDestinations();
ActiveMQDestination dests[] = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]);
List destsList=getDynamicallyIncludedDestinations();
ActiveMQDestination dests[]=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setDynamicallyIncludedDestinations(dests);
destsList = getExcludedDestinations();
dests = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]);
destsList=getExcludedDestinations();
dests=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setExcludedDestinations(dests);
destsList = getStaticallyIncludedDestinations();
dests = (ActiveMQDestination[]) destsList.toArray(new ActiveMQDestination[destsList.size()]);
destsList=getStaticallyIncludedDestinations();
dests=(ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setStaticallyIncludedDestinations(dests);
result.setBridgeTempDestinations(bridgeTempDestinations);
if (durableDestinations != null) {
ActiveMQDestination[] dest = new ActiveMQDestination[durableDestinations.size()];
dest = (ActiveMQDestination[]) durableDestinations.toArray(dest);
if(durableDestinations!=null){
ActiveMQDestination[] dest=new ActiveMQDestination[durableDestinations.size()];
dest=(ActiveMQDestination[])durableDestinations.toArray(dest);
result.setDurableDestinations(dest);
}
return result;
@ -273,82 +197,26 @@ public abstract class NetworkConnector extends ServiceSupport {
protected abstract String createName();
protected void doStart() throws Exception {
if (localURI == null) {
protected Transport createLocalTransport() throws Exception{
return TransportFactory.connect(localURI);
}
public void start() throws Exception{
serviceSupport.start();
}
public void stop() throws Exception{
serviceSupport.stop();
}
protected void handleStart() throws Exception{
if(localURI==null){
throw new IllegalStateException("You must configure the 'localURI' property");
}
log.info("Network Connector "+getName()+" Started");
}
protected void doStop(ServiceStopper stopper) throws Exception {
protected void handleStop(ServiceStopper stopper) throws Exception{
log.info("Network Connector "+getName()+" Stopped");
}
protected Transport createLocalTransport() throws Exception {
return TransportFactory.connect(localURI);
}
public boolean isDispatchAsync() {
return dispatchAsync;
}
public void setDispatchAsync(boolean dispatchAsync) {
this.dispatchAsync = dispatchAsync;
}
public int getPrefetchSize() {
return prefetchSize;
}
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
}
public ConnectionFilter getConnectionFilter() {
return connectionFilter;
}
public void setConnectionFilter(ConnectionFilter connectionFilter) {
this.connectionFilter = connectionFilter;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public boolean isBridgeTempDestinations() {
return bridgeTempDestinations;
}
public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
this.bridgeTempDestinations = bridgeTempDestinations;
}
/**
* @return the duplex
*/
public boolean isDuplex(){
return this.duplex;
}
/**
* @param duplex the duplex to set
*/
public void setDuplex(boolean duplex){
this.duplex=duplex;
}
}

View File

@ -88,6 +88,7 @@ public class BrokerInfoMarshaller extends BaseCommandMarshaller {
info.setNetworkConnection(bs.readBoolean());
info.setConnectionId(tightUnmarshalLong(wireFormat, dataIn, bs));
info.setBrokerUploadUrl(tightUnmarshalString(dataIn, bs));
info.setNetworkProperties(tightUnmarshalString(dataIn, bs));
}
@ -111,6 +112,7 @@ public class BrokerInfoMarshaller extends BaseCommandMarshaller {
bs.writeBoolean(info.isNetworkConnection());
rc+=tightMarshalLong1(wireFormat, info.getConnectionId(), bs);
rc += tightMarshalString1(info.getBrokerUploadUrl(), bs);
rc += tightMarshalString1(info.getNetworkProperties(), bs);
return rc + 0;
}
@ -137,6 +139,7 @@ public class BrokerInfoMarshaller extends BaseCommandMarshaller {
bs.readBoolean();
tightMarshalLong2(wireFormat, info.getConnectionId(), dataOut, bs);
tightMarshalString2(info.getBrokerUploadUrl(), dataOut, bs);
tightMarshalString2(info.getNetworkProperties(), dataOut, bs);
}
@ -173,6 +176,7 @@ public class BrokerInfoMarshaller extends BaseCommandMarshaller {
info.setNetworkConnection(dataIn.readBoolean());
info.setConnectionId(looseUnmarshalLong(wireFormat, dataIn));
info.setBrokerUploadUrl(looseUnmarshalString(dataIn));
info.setNetworkProperties(looseUnmarshalString(dataIn));
}
@ -196,6 +200,7 @@ public class BrokerInfoMarshaller extends BaseCommandMarshaller {
dataOut.writeBoolean(info.isNetworkConnection());
looseMarshalLong(wireFormat, info.getConnectionId(), dataOut);
looseMarshalString(info.getBrokerUploadUrl(), dataOut);
looseMarshalString(info.getNetworkProperties(), dataOut);
}
}

View File

@ -69,6 +69,8 @@ public class MessagePullMarshaller extends BaseCommandMarshaller {
info.setConsumerId((org.apache.activemq.command.ConsumerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
info.setTimeout(tightUnmarshalLong(wireFormat, dataIn, bs));
info.setCorrelationId(tightUnmarshalString(dataIn, bs));
info.setMessageId((org.apache.activemq.command.MessageId) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
}
@ -84,6 +86,8 @@ public class MessagePullMarshaller extends BaseCommandMarshaller {
rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getConsumerId(), bs);
rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs);
rc+=tightMarshalLong1(wireFormat, info.getTimeout(), bs);
rc += tightMarshalString1(info.getCorrelationId(), bs);
rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getMessageId(), bs);
return rc + 0;
}
@ -102,6 +106,8 @@ public class MessagePullMarshaller extends BaseCommandMarshaller {
tightMarshalCachedObject2(wireFormat, (DataStructure)info.getConsumerId(), dataOut, bs);
tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs);
tightMarshalLong2(wireFormat, info.getTimeout(), dataOut, bs);
tightMarshalString2(info.getCorrelationId(), dataOut, bs);
tightMarshalNestedObject2(wireFormat, (DataStructure)info.getMessageId(), dataOut, bs);
}
@ -119,6 +125,8 @@ public class MessagePullMarshaller extends BaseCommandMarshaller {
info.setConsumerId((org.apache.activemq.command.ConsumerId) looseUnmarsalCachedObject(wireFormat, dataIn));
info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn));
info.setTimeout(looseUnmarshalLong(wireFormat, dataIn));
info.setCorrelationId(looseUnmarshalString(dataIn));
info.setMessageId((org.apache.activemq.command.MessageId) looseUnmarsalNestedObject(wireFormat, dataIn));
}
@ -134,6 +142,8 @@ public class MessagePullMarshaller extends BaseCommandMarshaller {
looseMarshalCachedObject(wireFormat, (DataStructure)info.getConsumerId(), dataOut);
looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut);
looseMarshalLong(wireFormat, info.getTimeout(), dataOut);
looseMarshalString(info.getCorrelationId(), dataOut);
looseMarshalNestedObject(wireFormat, (DataStructure)info.getMessageId(), dataOut);
}
}

View File

@ -22,12 +22,15 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.io.UTFDataFormatException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
*
@ -368,6 +371,25 @@ public class MarshallingSupport {
return null;
}
}
public static String propertiesToString(Properties props) throws IOException{
String result="";
if(props!=null){
DataByteArrayOutputStream dataOut=new DataByteArrayOutputStream();
props.store(dataOut,"");
result=new String(dataOut.getData(),0,dataOut.size());
}
return result;
}
public static Properties stringToProperties(String str) throws IOException {
Properties result = new Properties();
if (str != null && str.length() > 0 ) {
DataByteArrayInputStream dataIn = new DataByteArrayInputStream(str.getBytes());
result.load(dataIn);
}
return result;
}
}

View File

@ -125,9 +125,10 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
protected void setUp() throws Exception {
super.setUp();
bridge = new DemandForwardingBridge(createTransport(), createRemoteTransport());
bridge.setLocalBrokerName("local");
bridge.setDispatchAsync(false);
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setLocalBrokerName("local");
config.setDispatchAsync(false);
bridge = new DemandForwardingBridge(config,createTransport(), createRemoteTransport());
bridge.start();
// PATCH: Give demand forwarding bridge a chance to finish setting up

View File

@ -68,5 +68,6 @@ public class BrokerInfoTest extends BaseCommandTestSupport {
info.setNetworkConnection(true);
info.setConnectionId(1);
info.setBrokerUploadUrl("BrokerUploadUrl:5");
info.setNetworkProperties("NetworkProperties:6");
}
}

View File

@ -54,5 +54,7 @@ public class MessagePullTest extends BaseCommandTestSupport {
info.setConsumerId(createConsumerId("ConsumerId:1"));
info.setDestination(createActiveMQDestination("Destination:2"));
info.setTimeout(1);
info.setCorrelationId("CorrelationId:3");
info.setMessageId(createMessageId("MessageId:4"));
}
}

View File

@ -20,6 +20,7 @@ package org.apache.activemq.usecases;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.transport.TransportFactory;
import java.util.List;
@ -56,9 +57,10 @@ public class MultiBrokersMultiClientsUsingTcpTest extends MultiBrokersMultiClien
// Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setLocalBrokerName(localBroker.getBrokerName());
DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
TransportFactory.connect(remoteURI));
bridge.setLocalBrokerName(localBroker.getBrokerName());
bridges.add(bridge);
bridge.start();

View File

@ -20,6 +20,7 @@ package org.apache.activemq.usecases;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.transport.TransportFactory;
import java.util.List;
@ -43,9 +44,10 @@ public class ThreeBrokerQueueNetworkUsingTcpTest extends ThreeBrokerQueueNetwork
// Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setLocalBrokerName(localBroker.getBrokerName());
DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
TransportFactory.connect(remoteURI));
bridge.setLocalBrokerName(localBroker.getBrokerName());
bridges.add(bridge);
bridge.start();

View File

@ -20,6 +20,7 @@ package org.apache.activemq.usecases;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.transport.TransportFactory;
import java.util.List;
@ -43,9 +44,10 @@ public class ThreeBrokerTopicNetworkUsingTcpTest extends ThreeBrokerTopicNetwork
// Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setLocalBrokerName(localBroker.getBrokerName());
DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
TransportFactory.connect(remoteURI));
bridge.setLocalBrokerName(localBroker.getBrokerName());
bridges.add(bridge);
bridge.start();

View File

@ -20,6 +20,7 @@ package org.apache.activemq.usecases;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.command.Command;
@ -115,7 +116,9 @@ public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultip
// Ensure that we are connecting using tcp
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
DemandForwardingBridge bridge = new DemandForwardingBridge(TransportFactory.connect(localURI),
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
config.setLocalBrokerName(localBroker.getBrokerName());
DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
TransportFactory.connect(remoteURI)) {
protected void serviceLocalCommand(Command command) {
if (command.isMessageDispatch()) {
@ -126,7 +129,6 @@ public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultip
super.serviceLocalCommand(command);
}
};
bridge.setLocalBrokerName(localBroker.getBrokerName());
bridges.add(bridge);
bridge.start();