to allow destination conversation on outbound messages with replyTo destinations



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@393383 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-04-12 04:46:22 +00:00
parent d964145532
commit 28647f1790
5 changed files with 204 additions and 104 deletions

View File

@ -34,7 +34,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
*
* @version $Revision: 1.1.1.1 $
*/
abstract class DestinationBridge implements Service,MessageListener{
public abstract class DestinationBridge implements Service,MessageListener{
private static final Log log=LogFactory.getLog(DestinationBridge.class);
protected MessageConsumer consumer;
protected AtomicBoolean started=new AtomicBoolean(false);
@ -93,32 +93,35 @@ abstract class DestinationBridge implements Service,MessageListener{
public void stop() throws Exception{
started.set(false);
}
public void onMessage(Message message){
if(started.get()&&message!=null){
try{
if(doHandleReplyTo){
Destination replyTo=message.getJMSReplyTo();
if(replyTo!=null){
replyTo=processReplyToDestination(replyTo);
message.setJMSReplyTo(replyTo);
}
}else {
message.setJMSReplyTo(null);
}
Message converted=jmsMessageConvertor.convert(message);
sendMessage(converted);
message.acknowledge();
}catch(JMSException e){
log.error("failed to forward message: "+message,e);
try{
stop();
}catch(Exception e1){
log.warn("Failed to stop cleanly",e1);
}
}
}
if(started.get()&&message!=null){
try{
Message converted;
if(doHandleReplyTo){
Destination replyTo = message.getJMSReplyTo();
if(replyTo != null){
converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
} else {
converted = jmsMessageConvertor.convert(message);
}
} else {
message.setJMSReplyTo(null);
converted = jmsMessageConvertor.convert(message);
}
sendMessage(converted);
message.acknowledge();
}catch(JMSException e){
log.error("failed to forward message: "+message,e);
try{
stop();
}catch(Exception e1){
log.warn("Failed to stop cleanly",e1);
}
}
}
}
/**
* @return Returns the doHandleReplyTo.

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.network.jms;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@ -34,5 +36,8 @@ public interface JmsMesageConvertor {
*/
public Message convert(Message message) throws JMSException;
public Message convert(Message message, Destination replyTo) throws JMSException;
public void setConnection(Connection connection);
}

View File

@ -16,9 +16,6 @@
*/
package org.apache.activemq.network.jms;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
@ -28,6 +25,9 @@ import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.NamingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A Bridge to other JMS Queue providers
*
@ -44,11 +44,7 @@ public class JmsQueueConnector extends JmsConnector{
private QueueConnection outboundQueueConnection;
private QueueConnection localQueueConnection;
private InboundQueueBridge[] inboundQueueBridges;
private OutboundQueueBridge[] outboundQueueBridges;
private OutboundQueueBridge[] outboundQueueBridges;
public boolean init(){
boolean result=super.init();
@ -56,6 +52,8 @@ public class JmsQueueConnector extends JmsConnector{
try{
initializeForeignQueueConnection();
initializeLocalQueueConnection();
initializeInboundJmsMessageConvertor();
initializeOutboundJmsMessageConvertor();
initializeInboundQueueBridges();
initializeOutboundQueueBridges();
}catch(Exception e){
@ -249,6 +247,14 @@ public class JmsQueueConnector extends JmsConnector{
}
localQueueConnection.start();
}
protected void initializeInboundJmsMessageConvertor(){
inboundMessageConvertor.setConnection(localQueueConnection);
}
protected void initializeOutboundJmsMessageConvertor(){
outboundMessageConvertor.setConnection(outboundQueueConnection);
}
protected void initializeInboundQueueBridges() throws JMSException{
if(inboundQueueBridges!=null){
@ -287,7 +293,6 @@ public class JmsQueueConnector extends JmsConnector{
bridge.setProducerQueue(foreignQueue);
bridge.setProducerConnection(outboundQueueConnection);
bridge.setConsumerConnection(localQueueConnection);
bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
@ -299,38 +304,71 @@ public class JmsQueueConnector extends JmsConnector{
}
}
protected Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection){
Queue queue = (Queue)destination;
OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(queue);
if (bridge == null){
bridge = new OutboundQueueBridge(){
//we only handle replyTo destinations - inbound
protected Destination processReplyToDestination (Destination destination){
return null;
}
};
try{
QueueSession localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
Queue localQueue = localSession.createTemporaryQueue();
localSession.close();
bridge.setConsumerQueue(localQueue);
bridge.setProducerQueue(queue);
bridge.setProducerConnection(outboundQueueConnection);
bridge.setConsumerConnection(localQueueConnection);
bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
bridge.setJmsConnector(this);
bridge.start();
log.info("Created replyTo bridge for " + queue);
}catch(Exception e){
log.error("Failed to create replyTo bridge for queue: " + queue,e);
return null;
}
replyToBridges.put(queue, bridge);
}
return bridge.getConsumerQueue();
protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection){
Queue replyToProducerQueue =(Queue)destination;
boolean isInbound = replyToProducerConnection.equals(localQueueConnection);
if(isInbound){
InboundQueueBridge bridge = (InboundQueueBridge) replyToBridges.get(replyToProducerQueue);
if (bridge == null){
bridge = new InboundQueueBridge(){
protected Destination processReplyToDestination (Destination destination){
return null;
}
};
try{
QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
replyToConsumerSession.close();
bridge.setConsumerQueue(replyToConsumerQueue);
bridge.setProducerQueue(replyToProducerQueue);
bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
}
bridge.setJmsConnector(this);
bridge.start();
log.info("Created replyTo bridge for " + replyToProducerQueue);
}catch(Exception e){
log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
return null;
}
replyToBridges.put(replyToProducerQueue, bridge);
}
return bridge.getConsumerQueue();
}else{
OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(replyToProducerQueue);
if (bridge == null){
bridge = new OutboundQueueBridge(){
protected Destination processReplyToDestination (Destination destination){
return null;
}
};
try{
QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
replyToConsumerSession.close();
bridge.setConsumerQueue(replyToConsumerQueue);
bridge.setProducerQueue(replyToProducerQueue);
bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
bridge.setJmsConnector(this);
bridge.start();
log.info("Created replyTo bridge for " + replyToProducerQueue);
}catch(Exception e){
log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
return null;
}
replyToBridges.put(replyToProducerQueue, bridge);
}
return bridge.getConsumerQueue();
}
}
protected Queue createActiveMQQueue(QueueSession session,String queueName) throws JMSException{

View File

@ -25,6 +25,7 @@ import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.NamingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -46,16 +47,14 @@ public class JmsTopicConnector extends JmsConnector{
private InboundTopicBridge[] inboundTopicBridges;
private OutboundTopicBridge[] outboundTopicBridges;
public boolean init(){
boolean result=super.init();
if(result){
try{
initializeForeignTopicConnection();
initializeLocalTopicConnection();
initializeInboundJmsMessageConvertor();
initializeOutboundJmsMessageConvertor();
initializeInboundTopicBridges();
initializeOutboundTopicBridges();
}catch(Exception e){
@ -250,6 +249,14 @@ public class JmsTopicConnector extends JmsConnector{
}
localTopicConnection.start();
}
protected void initializeInboundJmsMessageConvertor(){
inboundMessageConvertor.setConnection(localTopicConnection);
}
protected void initializeOutboundJmsMessageConvertor(){
outboundMessageConvertor.setConnection(outboundTopicConnection);
}
protected void initializeInboundTopicBridges() throws JMSException{
if(inboundTopicBridges!=null){
@ -288,7 +295,6 @@ public class JmsTopicConnector extends JmsConnector{
bridge.setProducerTopic(foreignTopic);
bridge.setProducerConnection(outboundTopicConnection);
bridge.setConsumerConnection(localTopicConnection);
bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
@ -300,39 +306,71 @@ public class JmsTopicConnector extends JmsConnector{
}
}
protected Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection){
Topic topic =(Topic)destination;
OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(topic);
if (bridge == null){
bridge = new OutboundTopicBridge(){
//we only handle replyTo destinations - inbound
protected Destination processReplyToDestination (Destination destination){
return null;
}
};
try{
TopicSession localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
Topic localTopic = localSession.createTemporaryTopic();
localSession.close();
bridge.setConsumerTopic(localTopic);
bridge.setProducerTopic(topic);
bridge.setProducerConnection(outboundTopicConnection);
bridge.setConsumerConnection(localTopicConnection);
bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
bridge.setJmsConnector(this);
bridge.start();
log.info("Created replyTo bridge for " + topic);
}catch(Exception e){
log.error("Failed to create replyTo bridge for topic: " + topic,e);
return null;
}
replyToBridges.put(topic, bridge);
}
return bridge.getConsumerTopic();
protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection){
Topic replyToProducerTopic =(Topic)destination;
boolean isInbound = replyToProducerConnection.equals(localTopicConnection);
if(isInbound){
InboundTopicBridge bridge = (InboundTopicBridge) replyToBridges.get(replyToProducerTopic);
if (bridge == null){
bridge = new InboundTopicBridge(){
protected Destination processReplyToDestination (Destination destination){
return null;
}
};
try{
TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
replyToConsumerSession.close();
bridge.setConsumerTopic(replyToConsumerTopic);
bridge.setProducerTopic(replyToProducerTopic);
bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
}
bridge.setJmsConnector(this);
bridge.start();
log.info("Created replyTo bridge for " + replyToProducerTopic);
}catch(Exception e){
log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
return null;
}
replyToBridges.put(replyToProducerTopic, bridge);
}
return bridge.getConsumerTopic();
}else{
OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(replyToProducerTopic);
if (bridge == null){
bridge = new OutboundTopicBridge(){
protected Destination processReplyToDestination (Destination destination){
return null;
}
};
try{
TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
replyToConsumerSession.close();
bridge.setConsumerTopic(replyToConsumerTopic);
bridge.setProducerTopic(replyToProducerTopic);
bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
bridge.setJmsConnector(this);
bridge.start();
log.info("Created replyTo bridge for " + replyToProducerTopic);
}catch(Exception e){
log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
return null;
}
replyToBridges.put(replyToProducerTopic, bridge);
}
return bridge.getConsumerTopic();
}
}
protected Topic createActiveMQTopic(TopicSession session,String topicName) throws JMSException{

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.network.jms;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@ -39,5 +41,19 @@ public class SimpleJmsMessageConvertor implements JmsMesageConvertor {
return message;
}
public Message convert(Message message, Destination replyTo) throws JMSException{
Message msg = convert(message);
if(replyTo != null) {
msg.setJMSReplyTo(replyTo);
}else{
msg.setJMSReplyTo(null);
}
return msg;
}
public void setConnection(Connection connection){
//do nothing
}
}