diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java index 8dd0fb74cc..d9ac1aa2c2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.network.jms; +import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -39,6 +40,7 @@ abstract class DestinationBridge implements Service,MessageListener{ protected AtomicBoolean started=new AtomicBoolean(false); protected JmsMesageConvertor jmsMessageConvertor; protected boolean doHandleReplyTo = true; + protected JmsConnector jmsConnector; /** * @return Returns the consumer. @@ -55,6 +57,12 @@ abstract class DestinationBridge implements Service,MessageListener{ this.consumer=consumer; } + /** + * @param connector + */ + public void setJmsConnector(JmsConnector connector){ + this.jmsConnector = connector; + } /** * @return Returns the inboundMessageConvertor. */ @@ -63,13 +71,17 @@ abstract class DestinationBridge implements Service,MessageListener{ } /** - * @param inboundMessageConvertor - * The inboundMessageConvertor to set. + * @param jmsMessageConvertor */ public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor){ this.jmsMessageConvertor=jmsMessageConvertor; } + + protected Destination processReplyToDestination (Destination destination){ + return jmsConnector.createReplyToBridge(destination, getConsumerConnection(), getProducerConnection()); + } + public void start() throws Exception{ if(started.compareAndSet(false,true)){ MessageConsumer consumer=createConsumer(); @@ -128,7 +140,9 @@ abstract class DestinationBridge implements Service,MessageListener{ protected abstract void sendMessage(Message message) throws JMSException; - protected abstract Destination processReplyToDestination(Destination destination); + protected abstract Connection getConsumerConnection(); + + protected abstract Connection getProducerConnection(); } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java index 7dec7ef567..09710a42b1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java @@ -19,6 +19,10 @@ package org.apache.activemq.network.jms; import java.util.Iterator; import java.util.List; import java.util.Map; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.QueueConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.Service; @@ -44,10 +48,14 @@ public abstract class JmsConnector implements Service{ protected JmsMesageConvertor outboundMessageConvertor; private List inboundBridges = new CopyOnWriteArrayList(); private List outboundBridges = new CopyOnWriteArrayList(); - protected int replyToDestinationCacheSize=10000; protected AtomicBoolean initialized = new AtomicBoolean(false); protected AtomicBoolean started = new AtomicBoolean(false); protected ActiveMQConnectionFactory embeddedConnectionFactory; + protected int replyToDestinationCacheSize=10000; + protected String outboundUsername; + protected String outboundPassword; + protected String localUsername; + protected String localPassword; protected LRUCache replyToBridges=new LRUCache(){ protected boolean removeEldestEntry(Map.Entry enty){ if(size()>maxCacheSize){ @@ -113,6 +121,8 @@ public abstract class JmsConnector implements Service{ } } + protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection); + /** * One way to configure the local connection - this is called by * The BrokerService when the Connector is embedded @@ -196,6 +206,62 @@ public abstract class JmsConnector implements Service{ } + /** + * @return Returns the localPassword. + */ + public String getLocalPassword(){ + return localPassword; + } + + /** + * @param localPassword The localPassword to set. + */ + public void setLocalPassword(String localPassword){ + this.localPassword=localPassword; + } + + /** + * @return Returns the localUsername. + */ + public String getLocalUsername(){ + return localUsername; + } + + /** + * @param localUsername The localUsername to set. + */ + public void setLocalUsername(String localUsername){ + this.localUsername=localUsername; + } + + /** + * @return Returns the outboundPassword. + */ + public String getOutboundPassword(){ + return outboundPassword; + } + + /** + * @param outboundPassword The outboundPassword to set. + */ + public void setOutboundPassword(String outboundPassword){ + this.outboundPassword=outboundPassword; + } + + /** + * @return Returns the outboundUsername. + */ + public String getOutboundUsername(){ + return outboundUsername; + } + + /** + * @param outboundUsername The outboundUsername to set. + */ + public void setOutboundUsername(String outboundUsername){ + this.outboundUsername=outboundUsername; + } + protected void addInboundBridge(DestinationBridge bridge){ inboundBridges.add(bridge); } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java index 16be575a03..33f8de57b0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java @@ -19,6 +19,7 @@ package org.apache.activemq.network.jms; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Queue; @@ -44,10 +45,7 @@ public class JmsQueueConnector extends JmsConnector{ private QueueConnection localQueueConnection; private InboundQueueBridge[] inboundQueueBridges; private OutboundQueueBridge[] outboundQueueBridges; - private String outboundUsername; - private String outboundPassword; - private String localUsername; - private String localPassword; + @@ -189,80 +187,7 @@ public class JmsQueueConnector extends JmsConnector{ this.outboundQueueConnectionFactory=foreignQueueConnectionFactory; } - /** - * @return Returns the outboundPassword. - */ - public String getOutboundPassword(){ - return outboundPassword; - } - - /** - * @param outboundPassword - * The outboundPassword to set. - */ - public void setOutboundPassword(String foreignPassword){ - this.outboundPassword=foreignPassword; - } - - /** - * @return Returns the outboundUsername. - */ - public String getOutboundUsername(){ - return outboundUsername; - } - - /** - * @param outboundUsername - * The outboundUsername to set. - */ - public void setOutboundUsername(String foreignUsername){ - this.outboundUsername=foreignUsername; - } - - /** - * @return Returns the localPassword. - */ - public String getLocalPassword(){ - return localPassword; - } - - /** - * @param localPassword - * The localPassword to set. - */ - public void setLocalPassword(String localPassword){ - this.localPassword=localPassword; - } - - /** - * @return Returns the localUsername. - */ - public String getLocalUsername(){ - return localUsername; - } - - /** - * @param localUsername - * The localUsername to set. - */ - public void setLocalUsername(String localUsername){ - this.localUsername=localUsername; - } - /** - * @return Returns the replyToDestinationCacheSize. - */ - public int getReplyToDestinationCacheSize(){ - return replyToDestinationCacheSize; - } - - /** - * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set. - */ - public void setReplyToDestinationCacheSize(int temporaryQueueCacheSize){ - this.replyToDestinationCacheSize=temporaryQueueCacheSize; - } - protected void initializeForeignQueueConnection() throws NamingException,JMSException{ if(outboundQueueConnection==null){ // get the connection factories @@ -341,7 +266,7 @@ public class JmsQueueConnector extends JmsConnector{ if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getInboundMessageConvertor()); } - bridge.setJmsQueueConnector(this); + bridge.setJmsConnector(this); addInboundBridge(bridge); } outboundSession.close(); @@ -366,7 +291,7 @@ public class JmsQueueConnector extends JmsConnector{ if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } - bridge.setJmsQueueConnector(this); + bridge.setJmsConnector(this); addOutboundBridge(bridge); } outboundSession.close(); @@ -374,7 +299,8 @@ public class JmsQueueConnector extends JmsConnector{ } } - protected Destination createReplyToQueueBridge(Queue queue, QueueConnection consumerConnection, QueueConnection producerConnection){ + protected Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection){ + Queue queue = (Queue)destination; OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(queue); if (bridge == null){ bridge = new OutboundQueueBridge(){ @@ -395,7 +321,7 @@ public class JmsQueueConnector extends JmsConnector{ if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } - bridge.setJmsQueueConnector(this); + bridge.setJmsConnector(this); bridge.start(); log.info("Created replyTo bridge for " + queue); }catch(Exception e){ diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java index 1557228c7c..6bde54209d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.network.jms; +import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; @@ -44,10 +45,7 @@ public class JmsTopicConnector extends JmsConnector{ private TopicConnection localTopicConnection; private InboundTopicBridge[] inboundTopicBridges; private OutboundTopicBridge[] outboundTopicBridges; - private String outboundUsername; - private String outboundPassword; - private String localUsername; - private String localPassword; + @@ -189,79 +187,7 @@ public class JmsTopicConnector extends JmsConnector{ this.outboundTopicConnectionFactory=foreignTopicConnectionFactory; } - /** - * @return Returns the outboundPassword. - */ - public String getOutboundPassword(){ - return outboundPassword; - } - - /** - * @param outboundPassword - * The outboundPassword to set. - */ - public void setOutboundPassword(String foreignPassword){ - this.outboundPassword=foreignPassword; - } - - /** - * @return Returns the outboundUsername. - */ - public String getOutboundUsername(){ - return outboundUsername; - } - - /** - * @param outboundUsername - * The outboundUsername to set. - */ - public void setOutboundUsername(String foreignUsername){ - this.outboundUsername=foreignUsername; - } - - /** - * @return Returns the localPassword. - */ - public String getLocalPassword(){ - return localPassword; - } - - /** - * @param localPassword - * The localPassword to set. - */ - public void setLocalPassword(String localPassword){ - this.localPassword=localPassword; - } - - /** - * @return Returns the localUsername. - */ - public String getLocalUsername(){ - return localUsername; - } - - /** - * @param localUsername - * The localUsername to set. - */ - public void setLocalUsername(String localUsername){ - this.localUsername=localUsername; - } - /** - * @return Returns the replyToDestinationCacheSize. - */ - public int getReplyToDestinationCacheSize(){ - return replyToDestinationCacheSize; - } - - /** - * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set. - */ - public void setReplyToDestinationCacheSize(int temporaryTopicCacheSize){ - this.replyToDestinationCacheSize=temporaryTopicCacheSize; - } protected void initializeForeignTopicConnection() throws NamingException,JMSException{ if(outboundTopicConnection==null){ @@ -341,7 +267,7 @@ public class JmsTopicConnector extends JmsConnector{ if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getInboundMessageConvertor()); } - bridge.setJmsTopicConnector(this); + bridge.setJmsConnector(this); addInboundBridge(bridge); } outboundSession.close(); @@ -366,7 +292,7 @@ public class JmsTopicConnector extends JmsConnector{ if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } - bridge.setJmsTopicConnector(this); + bridge.setJmsConnector(this); addOutboundBridge(bridge); } outboundSession.close(); @@ -374,7 +300,9 @@ public class JmsTopicConnector extends JmsConnector{ } } - protected Destination createReplyToTopicBridge(Topic topic, TopicConnection consumerConnection, TopicConnection producerConnection){ + protected Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection){ + Topic topic =(Topic)destination; + OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(topic); if (bridge == null){ bridge = new OutboundTopicBridge(){ @@ -395,7 +323,7 @@ public class JmsTopicConnector extends JmsConnector{ if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } - bridge.setJmsTopicConnector(this); + bridge.setJmsConnector(this); bridge.start(); log.info("Created replyTo bridge for " + topic); }catch(Exception e){ diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java index af082001c6..2f1882eec0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java @@ -42,7 +42,7 @@ class QueueBridge extends DestinationBridge{ protected QueueSender producer; protected QueueConnection consumerConnection; protected QueueConnection producerConnection; - protected JmsQueueConnector jmsQueueConnector; + public void stop() throws Exception{ super.stop(); @@ -54,9 +54,7 @@ class QueueBridge extends DestinationBridge{ } } - protected void setJmsQueueConnector(JmsQueueConnector connector){ - this.jmsQueueConnector = connector; - } + protected MessageConsumer createConsumer() throws JMSException{ // set up the consumer @@ -79,12 +77,7 @@ class QueueBridge extends DestinationBridge{ } - protected Destination processReplyToDestination (Destination destination){ - Queue queue = (Queue)destination; - return jmsQueueConnector.createReplyToQueueBridge(queue, getConsumerConnection(), getProducerConnection()); - } - - + protected void sendMessage(Message message) throws JMSException{ producer.send(producerQueue,message); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java index 4d6df71dd7..66315d9cc9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java @@ -41,7 +41,7 @@ class TopicBridge extends DestinationBridge{ protected TopicPublisher producer; protected TopicConnection consumerConnection; protected TopicConnection producerConnection; - protected JmsTopicConnector jmsTopicConnector; + public void stop() throws Exception{ super.stop(); @@ -53,9 +53,7 @@ class TopicBridge extends DestinationBridge{ } } - protected void setJmsTopicConnector(JmsTopicConnector connector){ - this.jmsTopicConnector = connector; - } + protected MessageConsumer createConsumer() throws JMSException{ // set up the consumer @@ -78,10 +76,7 @@ class TopicBridge extends DestinationBridge{ return consumer; } - protected Destination processReplyToDestination (Destination destination){ - Topic topic = (Topic)destination; - return jmsTopicConnector.createReplyToTopicBridge(topic, getConsumerConnection(), getProducerConnection()); - } + protected MessageProducer createProducer() throws JMSException{ producer = producerSession.createPublisher(null);