diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java index 3766826d25..76abb1ded9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java @@ -17,6 +17,12 @@ */ package org.apache.activemq.network.jms; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.Service; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; @@ -24,29 +30,26 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.naming.NamingException; -import org.apache.activemq.Service; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; /** * A Destination bridge is used to bridge between to different JMS systems * * @version $Revision: 1.1.1.1 $ */ -public abstract class DestinationBridge implements Service,MessageListener{ - private static final Log log=LogFactory.getLog(DestinationBridge.class); +public abstract class DestinationBridge implements Service, MessageListener { + private static final Log log = LogFactory.getLog(DestinationBridge.class); protected MessageConsumer consumer; - protected AtomicBoolean started=new AtomicBoolean(false); + protected AtomicBoolean started = new AtomicBoolean(false); protected JmsMesageConvertor jmsMessageConvertor; protected boolean doHandleReplyTo = true; protected JmsConnector jmsConnector; + private int maximumRetries = 10; /** * @return Returns the consumer. */ - public MessageConsumer getConsumer(){ + public MessageConsumer getConsumer() { return consumer; } @@ -54,88 +57,110 @@ public abstract class DestinationBridge implements Service,MessageListener{ * @param consumer * The consumer to set. */ - public void setConsumer(MessageConsumer consumer){ - this.consumer=consumer; + public void setConsumer(MessageConsumer consumer) { + this.consumer = consumer; } /** * @param connector */ - public void setJmsConnector(JmsConnector connector){ + public void setJmsConnector(JmsConnector connector) { this.jmsConnector = connector; } + /** * @return Returns the inboundMessageConvertor. */ - public JmsMesageConvertor getJmsMessageConvertor(){ + public JmsMesageConvertor getJmsMessageConvertor() { return jmsMessageConvertor; } /** - * @param jmsMessageConvertor + * @param jmsMessageConvertor */ - public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor){ - this.jmsMessageConvertor=jmsMessageConvertor; + public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) { + this.jmsMessageConvertor = jmsMessageConvertor; } - - protected Destination processReplyToDestination (Destination destination){ + public int getMaximumRetries() { + return maximumRetries; + } + + /** + * Sets the maximum number of retries if a send fails before closing the + * bridge + */ + public void setMaximumRetries(int maximumRetries) { + this.maximumRetries = maximumRetries; + } + + protected Destination processReplyToDestination(Destination destination) { return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer()); } - - public void start() throws Exception{ - if(started.compareAndSet(false,true)){ - MessageConsumer consumer=createConsumer(); + + public void start() throws Exception { + if (started.compareAndSet(false, true)) { + MessageConsumer consumer = createConsumer(); consumer.setMessageListener(this); createProducer(); } } - public void stop() throws Exception{ + public void stop() throws Exception { started.set(false); } - - public void onMessage(Message message){ - if(started.get()&&message!=null){ - try{ - Message converted; - if(doHandleReplyTo){ - Destination replyTo = message.getJMSReplyTo(); - if(replyTo != null){ - converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo)); - } else { - converted = jmsMessageConvertor.convert(message); - } - } else { - message.setJMSReplyTo(null); - converted = jmsMessageConvertor.convert(message); - } - sendMessage(converted); - message.acknowledge(); - }catch(JMSException e){ - log.error("failed to forward message: "+message,e); - try{ - stop(); - }catch(Exception e1){ - log.warn("Failed to stop cleanly",e1); - } - } - } + + public void onMessage(Message message) { + if (started.get() && message != null) { + int attempt = 0; + try { + if (attempt > 0) { + restartProducer(); + } + Message converted; + if (doHandleReplyTo) { + Destination replyTo = message.getJMSReplyTo(); + if (replyTo != null) { + converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo)); + } + else { + converted = jmsMessageConvertor.convert(message); + } + } + else { + message.setJMSReplyTo(null); + converted = jmsMessageConvertor.convert(message); + } + sendMessage(converted); + message.acknowledge(); + } + catch (Exception e) { + log.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e); + if (maximumRetries > 0 && attempt >= maximumRetries) { + try { + stop(); + } + catch (Exception e1) { + log.warn("Failed to stop cleanly", e1); + } + } + } + } } - /** * @return Returns the doHandleReplyTo. */ - protected boolean isDoHandleReplyTo(){ + protected boolean isDoHandleReplyTo() { return doHandleReplyTo; } /** - * @param doHandleReplyTo The doHandleReplyTo to set. + * @param doHandleReplyTo + * The doHandleReplyTo to set. */ - protected void setDoHandleReplyTo(boolean doHandleReplyTo){ - this.doHandleReplyTo=doHandleReplyTo; + protected void setDoHandleReplyTo(boolean doHandleReplyTo) { + this.doHandleReplyTo = doHandleReplyTo; } protected abstract MessageConsumer createConsumer() throws JMSException; @@ -145,8 +170,17 @@ public abstract class DestinationBridge implements Service,MessageListener{ protected abstract void sendMessage(Message message) throws JMSException; protected abstract Connection getConnnectionForConsumer(); - + protected abstract Connection getConnectionForProducer(); - + protected void restartProducer() throws JMSException, NamingException { + try { + getConnectionForProducer().close(); + } + catch (Exception e) { + log.debug("Ignoring failure to close producer connection: " + e, e); + } + jmsConnector.restartProducerConnection(); + createProducer(); + } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java index 3952bf2fa8..621536d575 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java @@ -23,6 +23,8 @@ import java.util.Map; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.JMSException; +import javax.naming.NamingException; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.Service; @@ -36,14 +38,15 @@ import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; /** - * This bridge joins the gap between foreign JMS providers and ActiveMQ As some JMS providers are still only 1.0.1 - * compliant, this bridge itself aimed to be JMS 1.0.2 compliant. + * This bridge joins the gap between foreign JMS providers and ActiveMQ As some + * JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be + * JMS 1.0.2 compliant. * * @version $Revision: 1.1.1.1 $ */ -public abstract class JmsConnector implements Service{ - - private static final Log log=LogFactory.getLog(JmsConnector.class); +public abstract class JmsConnector implements Service { + + private static final Log log = LogFactory.getLog(JmsConnector.class); protected JndiTemplate jndiLocalTemplate; protected JndiTemplate jndiOutboundTemplate; protected JmsMesageConvertor inboundMessageConvertor; @@ -52,101 +55,103 @@ public abstract class JmsConnector implements Service{ private List outboundBridges = new CopyOnWriteArrayList(); protected AtomicBoolean initialized = new AtomicBoolean(false); protected AtomicBoolean started = new AtomicBoolean(false); - protected ActiveMQConnectionFactory embeddedConnectionFactory; - protected int replyToDestinationCacheSize=10000; + protected ActiveMQConnectionFactory embeddedConnectionFactory; + protected int replyToDestinationCacheSize = 10000; protected String outboundUsername; protected String outboundPassword; protected String localUsername; protected String localPassword; private String name; - - protected LRUCache replyToBridges=new LRUCache(){ + + protected LRUCache replyToBridges = new LRUCache() { /** * */ private static final long serialVersionUID = -7446792754185879286L; - protected boolean removeEldestEntry(Map.Entry enty){ - if(size()>maxCacheSize){ - Iterator iter=entrySet().iterator(); - Map.Entry lru=(Map.Entry) iter.next(); + protected boolean removeEldestEntry(Map.Entry enty) { + if (size() > maxCacheSize) { + Iterator iter = entrySet().iterator(); + Map.Entry lru = (Map.Entry) iter.next(); remove(lru.getKey()); - DestinationBridge bridge=(DestinationBridge) lru.getValue(); - try{ + DestinationBridge bridge = (DestinationBridge) lru.getValue(); + try { bridge.stop(); - log.info("Expired bridge: "+bridge); - }catch(Exception e){ - log.warn("stopping expired bridge"+bridge+" caused an exception",e); + log.info("Expired bridge: " + bridge); + } + catch (Exception e) { + log.warn("stopping expired bridge" + bridge + " caused an exception", e); } } return false; } }; - public boolean init(){ - boolean result=initialized.compareAndSet(false,true); - if(result){ - if(jndiLocalTemplate==null){ - jndiLocalTemplate=new JndiTemplate(); + public boolean init() { + boolean result = initialized.compareAndSet(false, true); + if (result) { + if (jndiLocalTemplate == null) { + jndiLocalTemplate = new JndiTemplate(); } - if(jndiOutboundTemplate==null){ - jndiOutboundTemplate=new JndiTemplate(); + if (jndiOutboundTemplate == null) { + jndiOutboundTemplate = new JndiTemplate(); } - if(inboundMessageConvertor==null){ - inboundMessageConvertor=new SimpleJmsMessageConvertor(); + if (inboundMessageConvertor == null) { + inboundMessageConvertor = new SimpleJmsMessageConvertor(); } - if (outboundMessageConvertor==null){ - outboundMessageConvertor=new SimpleJmsMessageConvertor(); + if (outboundMessageConvertor == null) { + outboundMessageConvertor = new SimpleJmsMessageConvertor(); } replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize()); } return result; } - - public void start() throws Exception{ + + public void start() throws Exception { init(); - if (started.compareAndSet(false, true)){ - for(int i=0;i0){ @@ -74,6 +72,7 @@ class QueueBridge extends DestinationBridge{ } protected MessageProducer createProducer() throws JMSException{ + producerSession=producerConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); producer = producerSession.createSender(null); return producer; } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java index f80c0de4cd..9f45f3ee5e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java @@ -60,7 +60,6 @@ class TopicBridge extends DestinationBridge{ protected MessageConsumer createConsumer() throws JMSException{ // set up the consumer consumerSession=consumerConnection.createTopicSession(false,Session.CLIENT_ACKNOWLEDGE); - producerSession=producerConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer=null; if(consumerName!=null&&consumerName.length()>0){ if(selector!=null&&selector.length()>0){ @@ -81,6 +80,7 @@ class TopicBridge extends DestinationBridge{ protected MessageProducer createProducer() throws JMSException{ + producerSession=producerConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); producer = producerSession.createPublisher(null); return producer; }