From 28647f1790b434837ee0bf8fe58bb1999d82155b Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 12 Apr 2006 04:46:22 +0000 Subject: [PATCH] added changes from http://jira.activemq.org/jira//browse/AMQ-660 to allow destination conversation on outbound messages with replyTo destinations git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@393383 13f79535-47bb-0310-9956-ffa450edef68 --- .../network/jms/DestinationBridge.java | 53 ++++---- .../network/jms/JmsMesageConvertor.java | 5 + .../network/jms/JmsQueueConnector.java | 120 ++++++++++++------ .../network/jms/JmsTopicConnector.java | 114 +++++++++++------ .../jms/SimpleJmsMessageConvertor.java | 16 +++ 5 files changed, 204 insertions(+), 104 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java index ccd6be2baa..bc01a8980a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java @@ -34,7 +34,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; * * @version $Revision: 1.1.1.1 $ */ -abstract class DestinationBridge implements Service,MessageListener{ +public abstract class DestinationBridge implements Service,MessageListener{ private static final Log log=LogFactory.getLog(DestinationBridge.class); protected MessageConsumer consumer; protected AtomicBoolean started=new AtomicBoolean(false); @@ -93,32 +93,35 @@ abstract class DestinationBridge implements Service,MessageListener{ public void stop() throws Exception{ started.set(false); } - + public void onMessage(Message message){ - if(started.get()&&message!=null){ - try{ - if(doHandleReplyTo){ - Destination replyTo=message.getJMSReplyTo(); - if(replyTo!=null){ - replyTo=processReplyToDestination(replyTo); - message.setJMSReplyTo(replyTo); - } - }else { - message.setJMSReplyTo(null); - } - Message converted=jmsMessageConvertor.convert(message); - sendMessage(converted); - message.acknowledge(); - }catch(JMSException e){ - log.error("failed to forward message: "+message,e); - try{ - stop(); - }catch(Exception e1){ - log.warn("Failed to stop cleanly",e1); - } - } - } + if(started.get()&&message!=null){ + try{ + Message converted; + if(doHandleReplyTo){ + Destination replyTo = message.getJMSReplyTo(); + if(replyTo != null){ + converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo)); + } else { + converted = jmsMessageConvertor.convert(message); + } + } else { + message.setJMSReplyTo(null); + converted = jmsMessageConvertor.convert(message); + } + sendMessage(converted); + message.acknowledge(); + }catch(JMSException e){ + log.error("failed to forward message: "+message,e); + try{ + stop(); + }catch(Exception e1){ + log.warn("Failed to stop cleanly",e1); + } + } + } } + /** * @return Returns the doHandleReplyTo. diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java index 1b6203c274..de31f10661 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.network.jms; +import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -34,5 +36,8 @@ public interface JmsMesageConvertor { */ public Message convert(Message message) throws JMSException; + public Message convert(Message message, Destination replyTo) throws JMSException; + + public void setConnection(Connection connection); } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java index 313185d45f..95ee16a042 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.network.jms; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; @@ -28,6 +25,9 @@ import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import javax.jms.Session; import javax.naming.NamingException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * A Bridge to other JMS Queue providers * @@ -44,11 +44,7 @@ public class JmsQueueConnector extends JmsConnector{ private QueueConnection outboundQueueConnection; private QueueConnection localQueueConnection; private InboundQueueBridge[] inboundQueueBridges; - private OutboundQueueBridge[] outboundQueueBridges; - - - - + private OutboundQueueBridge[] outboundQueueBridges; public boolean init(){ boolean result=super.init(); @@ -56,6 +52,8 @@ public class JmsQueueConnector extends JmsConnector{ try{ initializeForeignQueueConnection(); initializeLocalQueueConnection(); + initializeInboundJmsMessageConvertor(); + initializeOutboundJmsMessageConvertor(); initializeInboundQueueBridges(); initializeOutboundQueueBridges(); }catch(Exception e){ @@ -249,6 +247,14 @@ public class JmsQueueConnector extends JmsConnector{ } localQueueConnection.start(); } + + protected void initializeInboundJmsMessageConvertor(){ + inboundMessageConvertor.setConnection(localQueueConnection); + } + + protected void initializeOutboundJmsMessageConvertor(){ + outboundMessageConvertor.setConnection(outboundQueueConnection); + } protected void initializeInboundQueueBridges() throws JMSException{ if(inboundQueueBridges!=null){ @@ -287,7 +293,6 @@ public class JmsQueueConnector extends JmsConnector{ bridge.setProducerQueue(foreignQueue); bridge.setProducerConnection(outboundQueueConnection); bridge.setConsumerConnection(localQueueConnection); - bridge.setDoHandleReplyTo(false); if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } @@ -299,38 +304,71 @@ public class JmsQueueConnector extends JmsConnector{ } } - protected Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection){ - Queue queue = (Queue)destination; - OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(queue); - if (bridge == null){ - bridge = new OutboundQueueBridge(){ - //we only handle replyTo destinations - inbound - protected Destination processReplyToDestination (Destination destination){ - return null; - } - }; - try{ - QueueSession localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); - Queue localQueue = localSession.createTemporaryQueue(); - localSession.close(); - bridge.setConsumerQueue(localQueue); - bridge.setProducerQueue(queue); - bridge.setProducerConnection(outboundQueueConnection); - bridge.setConsumerConnection(localQueueConnection); - bridge.setDoHandleReplyTo(false); - if(bridge.getJmsMessageConvertor()==null){ - bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); - } - bridge.setJmsConnector(this); - bridge.start(); - log.info("Created replyTo bridge for " + queue); - }catch(Exception e){ - log.error("Failed to create replyTo bridge for queue: " + queue,e); - return null; - } - replyToBridges.put(queue, bridge); - } - return bridge.getConsumerQueue(); + protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection){ + Queue replyToProducerQueue =(Queue)destination; + boolean isInbound = replyToProducerConnection.equals(localQueueConnection); + + if(isInbound){ + InboundQueueBridge bridge = (InboundQueueBridge) replyToBridges.get(replyToProducerQueue); + if (bridge == null){ + bridge = new InboundQueueBridge(){ + protected Destination processReplyToDestination (Destination destination){ + return null; + } + }; + try{ + QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE); + Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); + replyToConsumerSession.close(); + bridge.setConsumerQueue(replyToConsumerQueue); + bridge.setProducerQueue(replyToProducerQueue); + bridge.setProducerConnection((QueueConnection)replyToProducerConnection); + bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); + bridge.setDoHandleReplyTo(false); + if(bridge.getJmsMessageConvertor()==null){ + bridge.setJmsMessageConvertor(getInboundMessageConvertor()); + } + bridge.setJmsConnector(this); + bridge.start(); + log.info("Created replyTo bridge for " + replyToProducerQueue); + }catch(Exception e){ + log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e); + return null; + } + replyToBridges.put(replyToProducerQueue, bridge); + } + return bridge.getConsumerQueue(); + }else{ + OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(replyToProducerQueue); + if (bridge == null){ + bridge = new OutboundQueueBridge(){ + protected Destination processReplyToDestination (Destination destination){ + return null; + } + }; + try{ + QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE); + Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); + replyToConsumerSession.close(); + bridge.setConsumerQueue(replyToConsumerQueue); + bridge.setProducerQueue(replyToProducerQueue); + bridge.setProducerConnection((QueueConnection)replyToProducerConnection); + bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); + bridge.setDoHandleReplyTo(false); + if(bridge.getJmsMessageConvertor()==null){ + bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); + } + bridge.setJmsConnector(this); + bridge.start(); + log.info("Created replyTo bridge for " + replyToProducerQueue); + }catch(Exception e){ + log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e); + return null; + } + replyToBridges.put(replyToProducerQueue, bridge); + } + return bridge.getConsumerQueue(); + } } protected Queue createActiveMQQueue(QueueSession session,String queueName) throws JMSException{ diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java index 91a04540fa..b5bbe7847a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java @@ -25,6 +25,7 @@ import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.naming.NamingException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,16 +47,14 @@ public class JmsTopicConnector extends JmsConnector{ private InboundTopicBridge[] inboundTopicBridges; private OutboundTopicBridge[] outboundTopicBridges; - - - - public boolean init(){ boolean result=super.init(); if(result){ try{ initializeForeignTopicConnection(); initializeLocalTopicConnection(); + initializeInboundJmsMessageConvertor(); + initializeOutboundJmsMessageConvertor(); initializeInboundTopicBridges(); initializeOutboundTopicBridges(); }catch(Exception e){ @@ -250,6 +249,14 @@ public class JmsTopicConnector extends JmsConnector{ } localTopicConnection.start(); } + + protected void initializeInboundJmsMessageConvertor(){ + inboundMessageConvertor.setConnection(localTopicConnection); + } + + protected void initializeOutboundJmsMessageConvertor(){ + outboundMessageConvertor.setConnection(outboundTopicConnection); + } protected void initializeInboundTopicBridges() throws JMSException{ if(inboundTopicBridges!=null){ @@ -288,7 +295,6 @@ public class JmsTopicConnector extends JmsConnector{ bridge.setProducerTopic(foreignTopic); bridge.setProducerConnection(outboundTopicConnection); bridge.setConsumerConnection(localTopicConnection); - bridge.setDoHandleReplyTo(false); if(bridge.getJmsMessageConvertor()==null){ bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } @@ -300,39 +306,71 @@ public class JmsTopicConnector extends JmsConnector{ } } - protected Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection){ - Topic topic =(Topic)destination; - - OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(topic); - if (bridge == null){ - bridge = new OutboundTopicBridge(){ - //we only handle replyTo destinations - inbound - protected Destination processReplyToDestination (Destination destination){ - return null; - } - }; - try{ - TopicSession localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); - Topic localTopic = localSession.createTemporaryTopic(); - localSession.close(); - bridge.setConsumerTopic(localTopic); - bridge.setProducerTopic(topic); - bridge.setProducerConnection(outboundTopicConnection); - bridge.setConsumerConnection(localTopicConnection); - bridge.setDoHandleReplyTo(false); - if(bridge.getJmsMessageConvertor()==null){ - bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); - } - bridge.setJmsConnector(this); - bridge.start(); - log.info("Created replyTo bridge for " + topic); - }catch(Exception e){ - log.error("Failed to create replyTo bridge for topic: " + topic,e); - return null; - } - replyToBridges.put(topic, bridge); - } - return bridge.getConsumerTopic(); + protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection){ + Topic replyToProducerTopic =(Topic)destination; + boolean isInbound = replyToProducerConnection.equals(localTopicConnection); + + if(isInbound){ + InboundTopicBridge bridge = (InboundTopicBridge) replyToBridges.get(replyToProducerTopic); + if (bridge == null){ + bridge = new InboundTopicBridge(){ + protected Destination processReplyToDestination (Destination destination){ + return null; + } + }; + try{ + TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE); + Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); + replyToConsumerSession.close(); + bridge.setConsumerTopic(replyToConsumerTopic); + bridge.setProducerTopic(replyToProducerTopic); + bridge.setProducerConnection((TopicConnection)replyToProducerConnection); + bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection); + bridge.setDoHandleReplyTo(false); + if(bridge.getJmsMessageConvertor()==null){ + bridge.setJmsMessageConvertor(getInboundMessageConvertor()); + } + bridge.setJmsConnector(this); + bridge.start(); + log.info("Created replyTo bridge for " + replyToProducerTopic); + }catch(Exception e){ + log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e); + return null; + } + replyToBridges.put(replyToProducerTopic, bridge); + } + return bridge.getConsumerTopic(); + }else{ + OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(replyToProducerTopic); + if (bridge == null){ + bridge = new OutboundTopicBridge(){ + protected Destination processReplyToDestination (Destination destination){ + return null; + } + }; + try{ + TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE); + Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic(); + replyToConsumerSession.close(); + bridge.setConsumerTopic(replyToConsumerTopic); + bridge.setProducerTopic(replyToProducerTopic); + bridge.setProducerConnection((TopicConnection)replyToProducerConnection); + bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection); + bridge.setDoHandleReplyTo(false); + if(bridge.getJmsMessageConvertor()==null){ + bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); + } + bridge.setJmsConnector(this); + bridge.start(); + log.info("Created replyTo bridge for " + replyToProducerTopic); + }catch(Exception e){ + log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e); + return null; + } + replyToBridges.put(replyToProducerTopic, bridge); + } + return bridge.getConsumerTopic(); + } } protected Topic createActiveMQTopic(TopicSession session,String topicName) throws JMSException{ diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java index 796057a295..0b964c586c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.network.jms; +import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -39,5 +41,19 @@ public class SimpleJmsMessageConvertor implements JmsMesageConvertor { return message; } + public Message convert(Message message, Destination replyTo) throws JMSException{ + Message msg = convert(message); + if(replyTo != null) { + msg.setJMSReplyTo(replyTo); + }else{ + msg.setJMSReplyTo(null); + } + return msg; + } + + public void setConnection(Connection connection){ + //do nothing + } + } \ No newline at end of file