rationalize the QueueBridge/TopicBridge

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@376019 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-02-08 18:26:35 +00:00
parent d52e6ac042
commit 49e8a803aa
6 changed files with 105 additions and 183 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.network.jms; package org.apache.activemq.network.jms;
import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
@ -39,6 +40,7 @@ abstract class DestinationBridge implements Service,MessageListener{
protected AtomicBoolean started=new AtomicBoolean(false); protected AtomicBoolean started=new AtomicBoolean(false);
protected JmsMesageConvertor jmsMessageConvertor; protected JmsMesageConvertor jmsMessageConvertor;
protected boolean doHandleReplyTo = true; protected boolean doHandleReplyTo = true;
protected JmsConnector jmsConnector;
/** /**
* @return Returns the consumer. * @return Returns the consumer.
@ -55,6 +57,12 @@ abstract class DestinationBridge implements Service,MessageListener{
this.consumer=consumer; this.consumer=consumer;
} }
/**
* @param connector
*/
public void setJmsConnector(JmsConnector connector){
this.jmsConnector = connector;
}
/** /**
* @return Returns the inboundMessageConvertor. * @return Returns the inboundMessageConvertor.
*/ */
@ -63,13 +71,17 @@ abstract class DestinationBridge implements Service,MessageListener{
} }
/** /**
* @param inboundMessageConvertor * @param jmsMessageConvertor
* The inboundMessageConvertor to set.
*/ */
public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor){ public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor){
this.jmsMessageConvertor=jmsMessageConvertor; this.jmsMessageConvertor=jmsMessageConvertor;
} }
protected Destination processReplyToDestination (Destination destination){
return jmsConnector.createReplyToBridge(destination, getConsumerConnection(), getProducerConnection());
}
public void start() throws Exception{ public void start() throws Exception{
if(started.compareAndSet(false,true)){ if(started.compareAndSet(false,true)){
MessageConsumer consumer=createConsumer(); MessageConsumer consumer=createConsumer();
@ -128,7 +140,9 @@ abstract class DestinationBridge implements Service,MessageListener{
protected abstract void sendMessage(Message message) throws JMSException; protected abstract void sendMessage(Message message) throws JMSException;
protected abstract Destination processReplyToDestination(Destination destination); protected abstract Connection getConsumerConnection();
protected abstract Connection getProducerConnection();
} }

View File

@ -19,6 +19,10 @@ package org.apache.activemq.network.jms;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service; import org.apache.activemq.Service;
@ -44,10 +48,14 @@ public abstract class JmsConnector implements Service{
protected JmsMesageConvertor outboundMessageConvertor; protected JmsMesageConvertor outboundMessageConvertor;
private List inboundBridges = new CopyOnWriteArrayList(); private List inboundBridges = new CopyOnWriteArrayList();
private List outboundBridges = new CopyOnWriteArrayList(); private List outboundBridges = new CopyOnWriteArrayList();
protected int replyToDestinationCacheSize=10000;
protected AtomicBoolean initialized = new AtomicBoolean(false); protected AtomicBoolean initialized = new AtomicBoolean(false);
protected AtomicBoolean started = new AtomicBoolean(false); protected AtomicBoolean started = new AtomicBoolean(false);
protected ActiveMQConnectionFactory embeddedConnectionFactory; protected ActiveMQConnectionFactory embeddedConnectionFactory;
protected int replyToDestinationCacheSize=10000;
protected String outboundUsername;
protected String outboundPassword;
protected String localUsername;
protected String localPassword;
protected LRUCache replyToBridges=new LRUCache(){ protected LRUCache replyToBridges=new LRUCache(){
protected boolean removeEldestEntry(Map.Entry enty){ protected boolean removeEldestEntry(Map.Entry enty){
if(size()>maxCacheSize){ if(size()>maxCacheSize){
@ -113,6 +121,8 @@ public abstract class JmsConnector implements Service{
} }
} }
protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
/** /**
* One way to configure the local connection - this is called by * One way to configure the local connection - this is called by
* The BrokerService when the Connector is embedded * The BrokerService when the Connector is embedded
@ -196,6 +206,62 @@ public abstract class JmsConnector implements Service{
} }
/**
* @return Returns the localPassword.
*/
public String getLocalPassword(){
return localPassword;
}
/**
* @param localPassword The localPassword to set.
*/
public void setLocalPassword(String localPassword){
this.localPassword=localPassword;
}
/**
* @return Returns the localUsername.
*/
public String getLocalUsername(){
return localUsername;
}
/**
* @param localUsername The localUsername to set.
*/
public void setLocalUsername(String localUsername){
this.localUsername=localUsername;
}
/**
* @return Returns the outboundPassword.
*/
public String getOutboundPassword(){
return outboundPassword;
}
/**
* @param outboundPassword The outboundPassword to set.
*/
public void setOutboundPassword(String outboundPassword){
this.outboundPassword=outboundPassword;
}
/**
* @return Returns the outboundUsername.
*/
public String getOutboundUsername(){
return outboundUsername;
}
/**
* @param outboundUsername The outboundUsername to set.
*/
public void setOutboundUsername(String outboundUsername){
this.outboundUsername=outboundUsername;
}
protected void addInboundBridge(DestinationBridge bridge){ protected void addInboundBridge(DestinationBridge bridge){
inboundBridges.add(bridge); inboundBridges.add(bridge);
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.network.jms;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Queue; import javax.jms.Queue;
@ -44,10 +45,7 @@ public class JmsQueueConnector extends JmsConnector{
private QueueConnection localQueueConnection; private QueueConnection localQueueConnection;
private InboundQueueBridge[] inboundQueueBridges; private InboundQueueBridge[] inboundQueueBridges;
private OutboundQueueBridge[] outboundQueueBridges; private OutboundQueueBridge[] outboundQueueBridges;
private String outboundUsername;
private String outboundPassword;
private String localUsername;
private String localPassword;
@ -189,79 +187,6 @@ public class JmsQueueConnector extends JmsConnector{
this.outboundQueueConnectionFactory=foreignQueueConnectionFactory; this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
} }
/**
* @return Returns the outboundPassword.
*/
public String getOutboundPassword(){
return outboundPassword;
}
/**
* @param outboundPassword
* The outboundPassword to set.
*/
public void setOutboundPassword(String foreignPassword){
this.outboundPassword=foreignPassword;
}
/**
* @return Returns the outboundUsername.
*/
public String getOutboundUsername(){
return outboundUsername;
}
/**
* @param outboundUsername
* The outboundUsername to set.
*/
public void setOutboundUsername(String foreignUsername){
this.outboundUsername=foreignUsername;
}
/**
* @return Returns the localPassword.
*/
public String getLocalPassword(){
return localPassword;
}
/**
* @param localPassword
* The localPassword to set.
*/
public void setLocalPassword(String localPassword){
this.localPassword=localPassword;
}
/**
* @return Returns the localUsername.
*/
public String getLocalUsername(){
return localUsername;
}
/**
* @param localUsername
* The localUsername to set.
*/
public void setLocalUsername(String localUsername){
this.localUsername=localUsername;
}
/**
* @return Returns the replyToDestinationCacheSize.
*/
public int getReplyToDestinationCacheSize(){
return replyToDestinationCacheSize;
}
/**
* @param replyToDestinationCacheSize The replyToDestinationCacheSize to set.
*/
public void setReplyToDestinationCacheSize(int temporaryQueueCacheSize){
this.replyToDestinationCacheSize=temporaryQueueCacheSize;
}
protected void initializeForeignQueueConnection() throws NamingException,JMSException{ protected void initializeForeignQueueConnection() throws NamingException,JMSException{
if(outboundQueueConnection==null){ if(outboundQueueConnection==null){
@ -341,7 +266,7 @@ public class JmsQueueConnector extends JmsConnector{
if(bridge.getJmsMessageConvertor()==null){ if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getInboundMessageConvertor()); bridge.setJmsMessageConvertor(getInboundMessageConvertor());
} }
bridge.setJmsQueueConnector(this); bridge.setJmsConnector(this);
addInboundBridge(bridge); addInboundBridge(bridge);
} }
outboundSession.close(); outboundSession.close();
@ -366,7 +291,7 @@ public class JmsQueueConnector extends JmsConnector{
if(bridge.getJmsMessageConvertor()==null){ if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
} }
bridge.setJmsQueueConnector(this); bridge.setJmsConnector(this);
addOutboundBridge(bridge); addOutboundBridge(bridge);
} }
outboundSession.close(); outboundSession.close();
@ -374,7 +299,8 @@ public class JmsQueueConnector extends JmsConnector{
} }
} }
protected Destination createReplyToQueueBridge(Queue queue, QueueConnection consumerConnection, QueueConnection producerConnection){ protected Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection){
Queue queue = (Queue)destination;
OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(queue); OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(queue);
if (bridge == null){ if (bridge == null){
bridge = new OutboundQueueBridge(){ bridge = new OutboundQueueBridge(){
@ -395,7 +321,7 @@ public class JmsQueueConnector extends JmsConnector{
if(bridge.getJmsMessageConvertor()==null){ if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
} }
bridge.setJmsQueueConnector(this); bridge.setJmsConnector(this);
bridge.start(); bridge.start();
log.info("Created replyTo bridge for " + queue); log.info("Created replyTo bridge for " + queue);
}catch(Exception e){ }catch(Exception e){

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.network.jms; package org.apache.activemq.network.jms;
import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Session; import javax.jms.Session;
@ -44,10 +45,7 @@ public class JmsTopicConnector extends JmsConnector{
private TopicConnection localTopicConnection; private TopicConnection localTopicConnection;
private InboundTopicBridge[] inboundTopicBridges; private InboundTopicBridge[] inboundTopicBridges;
private OutboundTopicBridge[] outboundTopicBridges; private OutboundTopicBridge[] outboundTopicBridges;
private String outboundUsername;
private String outboundPassword;
private String localUsername;
private String localPassword;
@ -189,79 +187,7 @@ public class JmsTopicConnector extends JmsConnector{
this.outboundTopicConnectionFactory=foreignTopicConnectionFactory; this.outboundTopicConnectionFactory=foreignTopicConnectionFactory;
} }
/**
* @return Returns the outboundPassword.
*/
public String getOutboundPassword(){
return outboundPassword;
}
/**
* @param outboundPassword
* The outboundPassword to set.
*/
public void setOutboundPassword(String foreignPassword){
this.outboundPassword=foreignPassword;
}
/**
* @return Returns the outboundUsername.
*/
public String getOutboundUsername(){
return outboundUsername;
}
/**
* @param outboundUsername
* The outboundUsername to set.
*/
public void setOutboundUsername(String foreignUsername){
this.outboundUsername=foreignUsername;
}
/**
* @return Returns the localPassword.
*/
public String getLocalPassword(){
return localPassword;
}
/**
* @param localPassword
* The localPassword to set.
*/
public void setLocalPassword(String localPassword){
this.localPassword=localPassword;
}
/**
* @return Returns the localUsername.
*/
public String getLocalUsername(){
return localUsername;
}
/**
* @param localUsername
* The localUsername to set.
*/
public void setLocalUsername(String localUsername){
this.localUsername=localUsername;
}
/**
* @return Returns the replyToDestinationCacheSize.
*/
public int getReplyToDestinationCacheSize(){
return replyToDestinationCacheSize;
}
/**
* @param replyToDestinationCacheSize The replyToDestinationCacheSize to set.
*/
public void setReplyToDestinationCacheSize(int temporaryTopicCacheSize){
this.replyToDestinationCacheSize=temporaryTopicCacheSize;
}
protected void initializeForeignTopicConnection() throws NamingException,JMSException{ protected void initializeForeignTopicConnection() throws NamingException,JMSException{
if(outboundTopicConnection==null){ if(outboundTopicConnection==null){
@ -341,7 +267,7 @@ public class JmsTopicConnector extends JmsConnector{
if(bridge.getJmsMessageConvertor()==null){ if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getInboundMessageConvertor()); bridge.setJmsMessageConvertor(getInboundMessageConvertor());
} }
bridge.setJmsTopicConnector(this); bridge.setJmsConnector(this);
addInboundBridge(bridge); addInboundBridge(bridge);
} }
outboundSession.close(); outboundSession.close();
@ -366,7 +292,7 @@ public class JmsTopicConnector extends JmsConnector{
if(bridge.getJmsMessageConvertor()==null){ if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
} }
bridge.setJmsTopicConnector(this); bridge.setJmsConnector(this);
addOutboundBridge(bridge); addOutboundBridge(bridge);
} }
outboundSession.close(); outboundSession.close();
@ -374,7 +300,9 @@ public class JmsTopicConnector extends JmsConnector{
} }
} }
protected Destination createReplyToTopicBridge(Topic topic, TopicConnection consumerConnection, TopicConnection producerConnection){ protected Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection){
Topic topic =(Topic)destination;
OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(topic); OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(topic);
if (bridge == null){ if (bridge == null){
bridge = new OutboundTopicBridge(){ bridge = new OutboundTopicBridge(){
@ -395,7 +323,7 @@ public class JmsTopicConnector extends JmsConnector{
if(bridge.getJmsMessageConvertor()==null){ if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
} }
bridge.setJmsTopicConnector(this); bridge.setJmsConnector(this);
bridge.start(); bridge.start();
log.info("Created replyTo bridge for " + topic); log.info("Created replyTo bridge for " + topic);
}catch(Exception e){ }catch(Exception e){

View File

@ -42,7 +42,7 @@ class QueueBridge extends DestinationBridge{
protected QueueSender producer; protected QueueSender producer;
protected QueueConnection consumerConnection; protected QueueConnection consumerConnection;
protected QueueConnection producerConnection; protected QueueConnection producerConnection;
protected JmsQueueConnector jmsQueueConnector;
public void stop() throws Exception{ public void stop() throws Exception{
super.stop(); super.stop();
@ -54,9 +54,7 @@ class QueueBridge extends DestinationBridge{
} }
} }
protected void setJmsQueueConnector(JmsQueueConnector connector){
this.jmsQueueConnector = connector;
}
protected MessageConsumer createConsumer() throws JMSException{ protected MessageConsumer createConsumer() throws JMSException{
// set up the consumer // set up the consumer
@ -79,11 +77,6 @@ class QueueBridge extends DestinationBridge{
} }
protected Destination processReplyToDestination (Destination destination){
Queue queue = (Queue)destination;
return jmsQueueConnector.createReplyToQueueBridge(queue, getConsumerConnection(), getProducerConnection());
}
protected void sendMessage(Message message) throws JMSException{ protected void sendMessage(Message message) throws JMSException{

View File

@ -41,7 +41,7 @@ class TopicBridge extends DestinationBridge{
protected TopicPublisher producer; protected TopicPublisher producer;
protected TopicConnection consumerConnection; protected TopicConnection consumerConnection;
protected TopicConnection producerConnection; protected TopicConnection producerConnection;
protected JmsTopicConnector jmsTopicConnector;
public void stop() throws Exception{ public void stop() throws Exception{
super.stop(); super.stop();
@ -53,9 +53,7 @@ class TopicBridge extends DestinationBridge{
} }
} }
protected void setJmsTopicConnector(JmsTopicConnector connector){
this.jmsTopicConnector = connector;
}
protected MessageConsumer createConsumer() throws JMSException{ protected MessageConsumer createConsumer() throws JMSException{
// set up the consumer // set up the consumer
@ -78,10 +76,7 @@ class TopicBridge extends DestinationBridge{
return consumer; return consumer;
} }
protected Destination processReplyToDestination (Destination destination){
Topic topic = (Topic)destination;
return jmsTopicConnector.createReplyToTopicBridge(topic, getConsumerConnection(), getProducerConnection());
}
protected MessageProducer createProducer() throws JMSException{ protected MessageProducer createProducer() throws JMSException{
producer = producerSession.createPublisher(null); producer = producerSession.createPublisher(null);