From bc78238ad0a0d8f82c987b9228dca331487617a5 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Tue, 31 Jan 2012 21:56:03 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-3137 fix for: https://issues.apache.org/jira/browse/AMQ-2455 fix for: https://issues.apache.org/jira/browse/AMQ-3635 Adds reconnect logic and tests along with a policy class to allow for control over the reconnect process. Reconnection of both local and foreign side of the JmsConnector is supported. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1238827 13f79535-47bb-0310-9956-ffa450edef68 --- .../network/jms/DestinationBridge.java | 93 +++-- .../network/jms/InboundQueueBridge.java | 20 +- .../network/jms/InboundTopicBridge.java | 20 +- .../activemq/network/jms/JmsConnector.java | 339 ++++++++++++++-- .../network/jms/JmsMesageConvertor.java | 26 +- .../network/jms/JmsQueueConnector.java | 215 +++++----- .../network/jms/JmsTopicConnector.java | 218 ++++++----- .../network/jms/OutboundQueueBridge.java | 18 +- .../network/jms/OutboundTopicBridge.java | 15 +- .../activemq/network/jms/QueueBridge.java | 27 +- .../network/jms/ReconnectionPolicy.java | 244 ++++++++++++ .../jms/SimpleJmsMessageConvertor.java | 23 +- .../activemq/network/jms/TopicBridge.java | 28 +- .../QueueBridgeStandaloneReconnectTest.java | 366 ++++++++++++++++++ .../jms/QueueOutboundBridgeReconnectTest.java | 338 ++++++++++++++++ .../TopicBridgeStandaloneReconnectTest.java | 363 +++++++++++++++++ .../jms/TopicOutboundBridgeReconnectTest.java | 326 ++++++++++++++++ 17 files changed, 2353 insertions(+), 326 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/network/jms/ReconnectionPolicy.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/network/jms/QueueBridgeStandaloneReconnectTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/network/jms/QueueOutboundBridgeReconnectTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/network/jms/TopicBridgeStandaloneReconnectTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/network/jms/TopicOutboundBridgeReconnectTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java index 5099259097..ceca380b83 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java @@ -24,24 +24,22 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; -import javax.naming.NamingException; import org.apache.activemq.Service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A Destination bridge is used to bridge between to different JMS systems - * - * */ public abstract class DestinationBridge implements Service, MessageListener { + private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class); + protected MessageConsumer consumer; protected AtomicBoolean started = new AtomicBoolean(false); protected JmsMesageConvertor jmsMessageConvertor; protected boolean doHandleReplyTo = true; protected JmsConnector jmsConnector; - private int maximumRetries = 10; /** * @return Returns the consumer. @@ -78,26 +76,13 @@ public abstract class DestinationBridge implements Service, MessageListener { this.jmsMessageConvertor = jmsMessageConvertor; } - public int getMaximumRetries() { - return maximumRetries; - } - - /** - * Sets the maximum number of retries if a send fails before closing the - * bridge - */ - public void setMaximumRetries(int maximumRetries) { - this.maximumRetries = maximumRetries; - } - protected Destination processReplyToDestination(Destination destination) { return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer()); } public void start() throws Exception { if (started.compareAndSet(false, true)) { - MessageConsumer consumer = createConsumer(); - consumer.setMessageListener(this); + createConsumer(); createProducer(); } } @@ -107,37 +92,60 @@ public abstract class DestinationBridge implements Service, MessageListener { } public void onMessage(Message message) { + int attempt = 0; - while (started.get() && message != null) { - + final int maxRetries = jmsConnector.getReconnectionPolicy().getMaxSendRetries(); + + while (started.get() && message != null && ++attempt <= maxRetries) { + try { + if (attempt > 0) { - restartProducer(); + try { + Thread.sleep(jmsConnector.getReconnectionPolicy().getNextDelay(attempt)); + } catch(InterruptedException e) { + break; + } } + Message converted; - if (doHandleReplyTo) { - Destination replyTo = message.getJMSReplyTo(); - if (replyTo != null) { - converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo)); + if (jmsMessageConvertor != null) { + if (doHandleReplyTo) { + Destination replyTo = message.getJMSReplyTo(); + if (replyTo != null) { + converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo)); + } else { + converted = jmsMessageConvertor.convert(message); + } } else { + message.setJMSReplyTo(null); converted = jmsMessageConvertor.convert(message); } } else { - message.setJMSReplyTo(null); - converted = jmsMessageConvertor.convert(message); + // The Producer side is not up or not yet configured, retry. + continue; } - sendMessage(converted); - message.acknowledge(); + + try { + sendMessage(converted); + } catch(Exception e) { + jmsConnector.handleConnectionFailure(getConnectionForProducer()); + continue; + } + + try { + message.acknowledge(); + } catch(Exception e) { + jmsConnector.handleConnectionFailure(getConnnectionForConsumer()); + continue; + } + + // if we got here then it made it out and was ack'd return; + } catch (Exception e) { - LOG.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e); - if (maximumRetries > 0 && attempt >= maximumRetries) { - try { - stop(); - } catch (Exception e1) { - LOG.warn("Failed to stop cleanly", e1); - } - } + LOG.info("failed to forward message on attempt: " + attempt + + " reason: " + e + " message: " + message, e); } } } @@ -166,15 +174,4 @@ public abstract class DestinationBridge implements Service, MessageListener { protected abstract Connection getConnectionForProducer(); - protected void restartProducer() throws JMSException, NamingException { - try { - //don't reconnect immediately - Thread.sleep(1000); - getConnectionForProducer().close(); - } catch (Exception e) { - LOG.debug("Ignoring failure to close producer connection: " + e, e); - } - jmsConnector.restartProducerConnection(); - createProducer(); - } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java index 5909e5afdf..8f01b4c181 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundQueueBridge.java @@ -17,11 +17,12 @@ package org.apache.activemq.network.jms; /** - * Create an Inbound Queue Bridge - * + * Create an Inbound Queue Bridge. By default this class uses the sname name for + * both the inbound and outbound queue. This behavior can be overridden however + * by using the setter methods to configure both the inbound and outboud queue names + * separately. + * * @org.apache.xbean.XBean - * - * */ public class InboundQueueBridge extends QueueBridge { @@ -29,8 +30,8 @@ public class InboundQueueBridge extends QueueBridge { String localQueueName; /** - * Constructor that takes a foriegn destination as an argument - * + * Constructor that takes a foreign destination as an argument + * * @param inboundQueueName */ public InboundQueueBridge(String inboundQueueName) { @@ -39,7 +40,7 @@ public class InboundQueueBridge extends QueueBridge { } /** - * Default Contructor + * Default Constructor */ public InboundQueueBridge() { } @@ -52,6 +53,10 @@ public class InboundQueueBridge extends QueueBridge { } /** + * Sets the queue name used for the inbound queue, if the outbound queue + * name has not been set, then this method uses the same name to configure + * the outbound queue name. + * * @param inboundQueueName The inboundQueueName to set. */ public void setInboundQueueName(String inboundQueueName) { @@ -74,5 +79,4 @@ public class InboundQueueBridge extends QueueBridge { public void setLocalQueueName(String localQueueName) { this.localQueueName = localQueueName; } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java index 4b8a63d3ef..8d8af69863 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/InboundTopicBridge.java @@ -17,11 +17,12 @@ package org.apache.activemq.network.jms; /** - * Create an Inbound Topic Bridge - * + * Create an Inbound Topic Bridge. By default this class uses the topic name for + * both the inbound and outbound topic. This behavior can be overridden however + * by using the setter methods to configure both the inbound and outboud topic names + * separately. + * * @org.apache.xbean.XBean - * - * */ public class InboundTopicBridge extends TopicBridge { @@ -29,8 +30,8 @@ public class InboundTopicBridge extends TopicBridge { String localTopicName; /** - * Constructor that takes a foriegn destination as an argument - * + * Constructor that takes a foreign destination as an argument + * * @param inboundTopicName */ public InboundTopicBridge(String inboundTopicName) { @@ -39,7 +40,7 @@ public class InboundTopicBridge extends TopicBridge { } /** - * Default Contructor + * Default Constructor */ public InboundTopicBridge() { } @@ -52,6 +53,10 @@ public class InboundTopicBridge extends TopicBridge { } /** + * Sets the topic name used for the inbound topic, if the outbound topic + * name has not been set, then this method uses the same name to configure + * the outbound topic name. + * * @param inboundTopicName */ public void setInboundTopicName(String inboundTopicName) { @@ -74,5 +79,4 @@ public class InboundTopicBridge extends TopicBridge { public void setLocalTopicName(String localTopicName) { this.localTopicName = localTopicName; } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java index 87562822a2..ee1c20c700 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java @@ -20,12 +20,16 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.JMSException; -import javax.naming.NamingException; +import javax.jms.QueueConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.Service; @@ -37,22 +41,25 @@ import org.springframework.jndi.JndiTemplate; /** * This bridge joins the gap between foreign JMS providers and ActiveMQ As some - * JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be - * JMS 1.0.2 compliant. - * - * + * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself + * aimed to be in compliance with the JMS 1.0.2 specification. */ public abstract class JmsConnector implements Service { private static int nextId; private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class); - + protected JndiTemplate jndiLocalTemplate; protected JndiTemplate jndiOutboundTemplate; protected JmsMesageConvertor inboundMessageConvertor; protected JmsMesageConvertor outboundMessageConvertor; protected AtomicBoolean initialized = new AtomicBoolean(false); + protected AtomicBoolean localSideInitialized = new AtomicBoolean(false); + protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false); protected AtomicBoolean started = new AtomicBoolean(false); + protected AtomicBoolean failed = new AtomicBoolean(); + protected AtomicReference foreignConnection = new AtomicReference(); + protected AtomicReference localConnection = new AtomicReference(); protected ActiveMQConnectionFactory embeddedConnectionFactory; protected int replyToDestinationCacheSize = 10000; protected String outboundUsername; @@ -61,21 +68,22 @@ public abstract class JmsConnector implements Service { protected String localPassword; protected String outboundClientId; protected String localClientId; - protected LRUCache replyToBridges = createLRUCache(); + protected LRUCache replyToBridges = createLRUCache(); + private ReconnectionPolicy policy = new ReconnectionPolicy(); + protected ThreadPoolExecutor connectionSerivce; private List inboundBridges = new CopyOnWriteArrayList(); private List outboundBridges = new CopyOnWriteArrayList(); private String name; - - private static LRUCache createLRUCache() { - return new LRUCache() { + private static LRUCache createLRUCache() { + return new LRUCache() { private static final long serialVersionUID = -7446792754185879286L; - protected boolean removeEldestEntry(Map.Entry enty) { + protected boolean removeEldestEntry(Map.Entry enty) { if (size() > maxCacheSize) { - Iterator iter = entrySet().iterator(); - Map.Entry lru = (Map.Entry)iter.next(); + Iterator> iter = entrySet().iterator(); + Map.Entry lru = iter.next(); remove(lru.getKey()); DestinationBridge bridge = (DestinationBridge)lru.getValue(); try { @@ -90,8 +98,6 @@ public abstract class JmsConnector implements Service { }; } - /** - */ public boolean init() { boolean result = initialized.compareAndSet(false, true); if (result) { @@ -108,19 +114,49 @@ public abstract class JmsConnector implements Service { outboundMessageConvertor = new SimpleJmsMessageConvertor(); } replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize()); + + connectionSerivce = createExecutor(); + + // Subclasses can override this to customize their own it. + result = doConnectorInit(); } return result; } + protected boolean doConnectorInit() { + + // We try to make a connection via a sync call first so that the + // JmsConnector is fully initialized before the start call returns + // in order to avoid missing any messages that are dispatched + // immediately after startup. If either side fails we queue an + // asynchronous task to manage the reconnect attempts. + + try { + initializeLocalConnection(); + localSideInitialized.set(true); + } catch(Exception e) { + // Queue up the task to attempt the local connection. + scheduleAsyncLocalConnectionReconnect(); + } + + try { + initializeForeignConnection(); + foreignSideInitialized.set(true); + } catch(Exception e) { + // Queue up the task for the foreign connection now. + scheduleAsyncForeignConnectionReconnect(); + } + + return true; + } + public void start() throws Exception { - init(); if (started.compareAndSet(false, true)) { - for (int i = 0; i < inboundBridges.size(); i++) { - DestinationBridge bridge = inboundBridges.get(i); + init(); + for (DestinationBridge bridge : inboundBridges) { bridge.start(); } - for (int i = 0; i < outboundBridges.size(); i++) { - DestinationBridge bridge = outboundBridges.get(i); + for (DestinationBridge bridge : outboundBridges) { bridge.start(); } LOG.info("JMS Connector " + getName() + " Started"); @@ -129,21 +165,23 @@ public abstract class JmsConnector implements Service { public void stop() throws Exception { if (started.compareAndSet(true, false)) { - for (int i = 0; i < inboundBridges.size(); i++) { - DestinationBridge bridge = inboundBridges.get(i); + + this.connectionSerivce.shutdown(); + + for (DestinationBridge bridge : inboundBridges) { bridge.stop(); } - for (int i = 0; i < outboundBridges.size(); i++) { - DestinationBridge bridge = outboundBridges.get(i); + for (DestinationBridge bridge : outboundBridges) { bridge.stop(); } LOG.info("JMS Connector " + getName() + " Stopped"); } } - + public void clearBridges() { inboundBridges.clear(); outboundBridges.clear(); + replyToBridges.clear(); } protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection); @@ -151,13 +189,21 @@ public abstract class JmsConnector implements Service { /** * One way to configure the local connection - this is called by The * BrokerService when the Connector is embedded - * + * * @param service */ public void setBrokerService(BrokerService service) { embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI()); } + public Connection getLocalConnection() { + return this.localConnection.get(); + } + + public Connection getForeignConnection() { + return this.foreignConnection.get(); + } + /** * @return Returns the jndiTemplate. */ @@ -222,8 +268,7 @@ public abstract class JmsConnector implements Service { } /** - * @param replyToDestinationCacheSize The replyToDestinationCacheSize to - * set. + * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set. */ public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) { this.replyToDestinationCacheSize = replyToDestinationCacheSize; @@ -284,7 +329,7 @@ public abstract class JmsConnector implements Service { public void setOutboundUsername(String outboundUsername) { this.outboundUsername = outboundUsername; } - + /** * @return the outboundClientId */ @@ -312,14 +357,38 @@ public abstract class JmsConnector implements Service { public void setLocalClientId(String localClientId) { this.localClientId = localClientId; } - - + + /** + * @return the currently configured reconnection policy. + */ + public ReconnectionPolicy getReconnectionPolicy() { + return this.policy; + } + + /** + * @param policy The new reconnection policy this {@link JmsConnector} should use. + */ + public void setReconnectionPolicy(ReconnectionPolicy policy) { + this.policy = policy; + } + + /** + * @return returns true if the {@link JmsConnector} is connected to both brokers. + */ + public boolean isConnected() { + return localConnection.get() != null && foreignConnection.get() != null; + } + protected void addInboundBridge(DestinationBridge bridge) { - inboundBridges.add(bridge); + if (!inboundBridges.contains(bridge)) { + inboundBridges.add(bridge); + } } protected void addOutboundBridge(DestinationBridge bridge) { - outboundBridges.add(bridge); + if (!outboundBridges.contains(bridge)) { + outboundBridges.add(bridge); + } } protected void removeInboundBridge(DestinationBridge bridge) { @@ -337,13 +406,205 @@ public abstract class JmsConnector implements Service { return name; } - private static synchronized int getNextId() { - return nextId++; - } - public void setName(String name) { this.name = name; } - public abstract void restartProducerConnection() throws NamingException, JMSException; + private static synchronized int getNextId() { + return nextId++; + } + + public boolean isFailed() { + return this.failed.get(); + } + + /** + * Performs the work of connection to the local side of the Connection. + *

+ * This creates the initial connection to the local end of the {@link JmsConnector} + * and then sets up all the destination bridges with the information needed to bridge + * on the local side of the connection. + * + * @throws Exception if the connection cannot be established for any reason. + */ + protected abstract void initializeLocalConnection() throws Exception; + + /** + * Performs the work of connection to the foreign side of the Connection. + *

+ * This creates the initial connection to the foreign end of the {@link JmsConnector} + * and then sets up all the destination bridges with the information needed to bridge + * on the foreign side of the connection. + * + * @throws Exception if the connection cannot be established for any reason. + */ + protected abstract void initializeForeignConnection() throws Exception; + + /** + * Callback method that the Destination bridges can use to report an exception to occurs + * during normal bridging operations. + * + * @param connection + * The connection that was in use when the failure occured. + */ + void handleConnectionFailure(Connection connection) { + + // Can happen if async exception listener kicks in at the same time. + if (connection == null || !this.started.get()) { + return; + } + + LOG.info("JmsConnector handling loss of connection [" + connection.toString() + "]"); + + // TODO - How do we handle the re-wiring of replyToBridges in this case. + replyToBridges.clear(); + + if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) { + + // Stop the inbound bridges when the foreign connection is dropped since + // the bridge has no consumer and needs to be restarted once a new connection + // to the foreign side is made. + for (DestinationBridge bridge : inboundBridges) { + try { + bridge.stop(); + } catch(Exception e) { + } + } + + // We got here first and cleared the connection, now we queue a reconnect. + this.connectionSerivce.execute(new Runnable() { + + @Override + public void run() { + try { + doInitializeConnection(false); + } catch (Exception e) { + LOG.error("Failed to initialize forgein connection for the JMSConnector", e); + } + } + }); + + } else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) { + + // Stop the outbound bridges when the local connection is dropped since + // the bridge has no consumer and needs to be restarted once a new connection + // to the local side is made. + for (DestinationBridge bridge : outboundBridges) { + try { + bridge.stop(); + } catch(Exception e) { + } + } + + // We got here first and cleared the connection, now we queue a reconnect. + this.connectionSerivce.execute(new Runnable() { + + @Override + public void run() { + try { + doInitializeConnection(true); + } catch (Exception e) { + LOG.error("Failed to initialize local connection for the JMSConnector", e); + } + } + }); + } + } + + private void scheduleAsyncLocalConnectionReconnect() { + this.connectionSerivce.execute(new Runnable() { + @Override + public void run() { + try { + doInitializeConnection(true); + } catch (Exception e) { + LOG.error("Failed to initialize local connection for the JMSConnector", e); + } + } + }); + } + + private void scheduleAsyncForeignConnectionReconnect() { + this.connectionSerivce.execute(new Runnable() { + @Override + public void run() { + try { + doInitializeConnection(false); + } catch (Exception e) { + LOG.error("Failed to initialize forgein connection for the JMSConnector", e); + } + } + }); + } + + private void doInitializeConnection(boolean local) throws Exception { + + int attempt = 0; + + final int maxRetries; + if (local) { + maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() : + policy.getMaxReconnectAttempts(); + } else { + maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() : + policy.getMaxReconnectAttempts(); + } + + do + { + if (attempt > 0) { + try { + Thread.sleep(policy.getNextDelay(attempt)); + } catch(InterruptedException e) { + } + } + + if (connectionSerivce.isTerminating()) { + return; + } + + try { + + if (local) { + initializeLocalConnection(); + localSideInitialized.set(true); + } else { + initializeForeignConnection(); + foreignSideInitialized.set(true); + } + + // Once we are connected we ensure all the bridges are started. + if (localConnection.get() != null && foreignConnection.get() != null) { + for (DestinationBridge bridge : inboundBridges) { + bridge.start(); + } + for (DestinationBridge bridge : outboundBridges) { + bridge.start(); + } + } + + return; + } catch(Exception e) { + LOG.debug("Failed to establish initial " + (local ? "local" : "foriegn") + + " connection for JmsConnector [" + attempt + "]: " + e.getMessage()); + } + } + while (maxRetries < ++attempt && !connectionSerivce.isTerminating()); + + this.failed.set(true); + } + + private ThreadFactory factory = new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: "); + thread.setDaemon(true); + return thread; + } + }; + + private ThreadPoolExecutor createExecutor() { + ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), factory); + exec.allowCoreThreadTimeOut(true); + return exec; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java index a8b7c7d21e..ca4891de31 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java @@ -23,21 +23,35 @@ import javax.jms.Message; /** * Converts Message from one JMS to another - * - * */ public interface JmsMesageConvertor { - + /** * Convert a foreign JMS Message to a native ActiveMQ Message + * * @param message + * The target message to convert to a native ActiveMQ message + * * @return the converted message * @throws JMSException */ Message convert(Message message) throws JMSException; - + + /** + * Convert a foreign JMS Message to a native ActiveMQ Message - Inbound or + * visa-versa outbound. If the replyTo Destination instance is not null + * then the Message is configured with the given replyTo value. + * + * @param message + * The target message to convert to a native ActiveMQ message + * @param replyTo + * The replyTo Destination to set on the converted Message. + * + * @return the converted message + * @throws JMSException + */ Message convert(Message message, Destination replyTo) throws JMSException; - + void setConnection(Connection connection); - + } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java index 9f14767fb1..61d3de42e0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java @@ -18,6 +18,7 @@ package org.apache.activemq.network.jms; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueConnection; @@ -31,10 +32,8 @@ import org.slf4j.LoggerFactory; /** * A Bridge to other JMS Queue providers - * + * * @org.apache.xbean.XBean - * - * */ public class JmsQueueConnector extends JmsConnector { private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class); @@ -42,28 +41,9 @@ public class JmsQueueConnector extends JmsConnector { private String localConnectionFactoryName; private QueueConnectionFactory outboundQueueConnectionFactory; private QueueConnectionFactory localQueueConnectionFactory; - private QueueConnection outboundQueueConnection; - private QueueConnection localQueueConnection; private InboundQueueBridge[] inboundQueueBridges; private OutboundQueueBridge[] outboundQueueBridges; - public boolean init() { - boolean result = super.init(); - if (result) { - try { - initializeForeignQueueConnection(); - initializeLocalQueueConnection(); - initializeInboundJmsMessageConvertor(); - initializeOutboundJmsMessageConvertor(); - initializeInboundQueueBridges(); - initializeOutboundQueueBridges(); - } catch (Exception e) { - LOG.error("Failed to initialize the JMSConnector", e); - } - } - return result; - } - /** * @return Returns the inboundQueueBridges. */ @@ -147,28 +127,28 @@ public class JmsQueueConnector extends JmsConnector { * @return Returns the localQueueConnection. */ public QueueConnection getLocalQueueConnection() { - return localQueueConnection; + return (QueueConnection) localConnection.get(); } /** * @param localQueueConnection The localQueueConnection to set. */ public void setLocalQueueConnection(QueueConnection localQueueConnection) { - this.localQueueConnection = localQueueConnection; + this.localConnection.set(localQueueConnection); } /** * @return Returns the outboundQueueConnection. */ public QueueConnection getOutboundQueueConnection() { - return outboundQueueConnection; + return (QueueConnection) foreignConnection.get(); } /** * @param outboundQueueConnection The outboundQueueConnection to set. */ public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) { - this.outboundQueueConnection = foreignQueueConnection; + this.foreignConnection.set(foreignQueueConnection); } /** @@ -179,27 +159,12 @@ public class JmsQueueConnector extends JmsConnector { this.outboundQueueConnectionFactory = foreignQueueConnectionFactory; } - public void restartProducerConnection() throws NamingException, JMSException { - outboundQueueConnection = null; - initializeForeignQueueConnection(); + @Override + protected void initializeForeignConnection() throws NamingException, JMSException { - // the outboundQueueConnection was reestablished - publish the new connection to the bridges - if (inboundQueueBridges != null) { - for (int i = 0; i < inboundQueueBridges.length; i++) { - InboundQueueBridge bridge = inboundQueueBridges[i]; - bridge.setConsumerConnection(outboundQueueConnection); - } - } - if (outboundQueueBridges != null) { - for (int i = 0; i < outboundQueueBridges.length; i++) { - OutboundQueueBridge bridge = outboundQueueBridges[i]; - bridge.setProducerConnection(outboundQueueConnection); - } - } - } + final QueueConnection newConnection; - protected void initializeForeignQueueConnection() throws NamingException, JMSException { - if (outboundQueueConnection == null) { + if (foreignConnection.get() == null) { // get the connection factories if (outboundQueueConnectionFactory == null) { // look it up from JNDI @@ -207,31 +172,57 @@ public class JmsQueueConnector extends JmsConnector { outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class); if (outboundUsername != null) { - outboundQueueConnection = outboundQueueConnectionFactory + newConnection = outboundQueueConnectionFactory .createQueueConnection(outboundUsername, outboundPassword); } else { - outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection(); + newConnection = outboundQueueConnectionFactory.createQueueConnection(); } } else { throw new JMSException("Cannot create foreignConnection - no information"); } } else { if (outboundUsername != null) { - outboundQueueConnection = outboundQueueConnectionFactory + newConnection = outboundQueueConnectionFactory .createQueueConnection(outboundUsername, outboundPassword); } else { - outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection(); + newConnection = outboundQueueConnectionFactory.createQueueConnection(); } } + } else { + // Clear if for now in case something goes wrong during the init. + newConnection = (QueueConnection) foreignConnection.getAndSet(null); } - if (localClientId != null && localClientId.length() > 0) { - outboundQueueConnection.setClientID(getOutboundClientId()); + + if (outboundClientId != null && outboundClientId.length() > 0) { + newConnection.setClientID(getOutboundClientId()); } - outboundQueueConnection.start(); + newConnection.start(); + + outboundMessageConvertor.setConnection(newConnection); + + // Configure the bridges with the new Outbound connection. + initializeInboundDestinationBridgesOutboundSide(newConnection); + initializeOutboundDestinationBridgesOutboundSide(newConnection); + + // Register for any async error notifications now so we can reset in the + // case where there's not a lot of activity and a connection drops. + newConnection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + handleConnectionFailure(newConnection); + } + }); + + // At this point all looks good, so this our current connection now. + foreignConnection.set(newConnection); } - protected void initializeLocalQueueConnection() throws NamingException, JMSException { - if (localQueueConnection == null) { + @Override + protected void initializeLocalConnection() throws NamingException, JMSException { + + final QueueConnection newConnection; + + if (localConnection.get() == null) { // get the connection factories if (localQueueConnectionFactory == null) { if (embeddedConnectionFactory == null) { @@ -240,83 +231,100 @@ public class JmsQueueConnector extends JmsConnector { localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate .lookup(localConnectionFactoryName, QueueConnectionFactory.class); if (localUsername != null) { - localQueueConnection = localQueueConnectionFactory + newConnection = localQueueConnectionFactory .createQueueConnection(localUsername, localPassword); } else { - localQueueConnection = localQueueConnectionFactory.createQueueConnection(); + newConnection = localQueueConnectionFactory.createQueueConnection(); } } else { throw new JMSException("Cannot create localConnection - no information"); } } else { - localQueueConnection = embeddedConnectionFactory.createQueueConnection(); + newConnection = embeddedConnectionFactory.createQueueConnection(); } } else { if (localUsername != null) { - localQueueConnection = localQueueConnectionFactory.createQueueConnection(localUsername, - localPassword); + newConnection = localQueueConnectionFactory. + createQueueConnection(localUsername, localPassword); } else { - localQueueConnection = localQueueConnectionFactory.createQueueConnection(); + newConnection = localQueueConnectionFactory.createQueueConnection(); } } + + } else { + // Clear if for now in case something goes wrong during the init. + newConnection = (QueueConnection) localConnection.getAndSet(null); } + if (localClientId != null && localClientId.length() > 0) { - localQueueConnection.setClientID(getLocalClientId()); + newConnection.setClientID(getLocalClientId()); } - localQueueConnection.start(); + newConnection.start(); + + inboundMessageConvertor.setConnection(newConnection); + + // Configure the bridges with the new Local connection. + initializeInboundDestinationBridgesLocalSide(newConnection); + initializeOutboundDestinationBridgesLocalSide(newConnection); + + // Register for any async error notifications now so we can reset in the + // case where there's not a lot of activity and a connection drops. + newConnection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + handleConnectionFailure(newConnection); + } + }); + + // At this point all looks good, so this our current connection now. + localConnection.set(newConnection); } - protected void initializeInboundJmsMessageConvertor() { - inboundMessageConvertor.setConnection(localQueueConnection); - } - - protected void initializeOutboundJmsMessageConvertor() { - outboundMessageConvertor.setConnection(outboundQueueConnection); - } - - protected void initializeInboundQueueBridges() throws JMSException { + protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException { if (inboundQueueBridges != null) { - QueueSession outboundSession = outboundQueueConnection - .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - QueueSession localSession = localQueueConnection.createQueueSession(false, - Session.AUTO_ACKNOWLEDGE); - for (int i = 0; i < inboundQueueBridges.length; i++) { - InboundQueueBridge bridge = inboundQueueBridges[i]; - String localQueueName = bridge.getLocalQueueName(); - Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); + QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + for (InboundQueueBridge bridge : inboundQueueBridges) { String queueName = bridge.getInboundQueueName(); Queue foreignQueue = createForeignQueue(outboundSession, queueName); + bridge.setConsumer(null); bridge.setConsumerQueue(foreignQueue); + bridge.setConsumerConnection(connection); + bridge.setJmsConnector(this); + addInboundBridge(bridge); + } + outboundSession.close(); + } + } + + protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException { + if (inboundQueueBridges != null) { + QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); + + for (InboundQueueBridge bridge : inboundQueueBridges) { + String localQueueName = bridge.getLocalQueueName(); + Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); bridge.setProducerQueue(activemqQueue); - bridge.setProducerConnection(localQueueConnection); - bridge.setConsumerConnection(outboundQueueConnection); + bridge.setProducerConnection(connection); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getInboundMessageConvertor()); } bridge.setJmsConnector(this); addInboundBridge(bridge); } - outboundSession.close(); localSession.close(); } } - protected void initializeOutboundQueueBridges() throws JMSException { + protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException { if (outboundQueueBridges != null) { - QueueSession outboundSession = outboundQueueConnection - .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - QueueSession localSession = localQueueConnection.createQueueSession(false, - Session.AUTO_ACKNOWLEDGE); - for (int i = 0; i < outboundQueueBridges.length; i++) { - OutboundQueueBridge bridge = outboundQueueBridges[i]; - String localQueueName = bridge.getLocalQueueName(); - Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); + QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + for (OutboundQueueBridge bridge : outboundQueueBridges) { String queueName = bridge.getOutboundQueueName(); Queue foreignQueue = createForeignQueue(outboundSession, queueName); - bridge.setConsumerQueue(activemqQueue); bridge.setProducerQueue(foreignQueue); - bridge.setProducerConnection(outboundQueueConnection); - bridge.setConsumerConnection(localQueueConnection); + bridge.setProducerConnection(connection); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } @@ -324,6 +332,23 @@ public class JmsQueueConnector extends JmsConnector { addOutboundBridge(bridge); } outboundSession.close(); + } + } + + protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException { + if (outboundQueueBridges != null) { + QueueSession localSession = + connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + for (OutboundQueueBridge bridge : outboundQueueBridges) { + String localQueueName = bridge.getLocalQueueName(); + Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); + bridge.setConsumer(null); + bridge.setConsumerQueue(activemqQueue); + bridge.setConsumerConnection(connection); + bridge.setJmsConnector(this); + addOutboundBridge(bridge); + } localSession.close(); } } @@ -331,7 +356,7 @@ public class JmsQueueConnector extends JmsConnector { protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection) { Queue replyToProducerQueue = (Queue)destination; - boolean isInbound = replyToProducerConnection.equals(localQueueConnection); + boolean isInbound = replyToProducerConnection.equals(localConnection.get()); if (isInbound) { InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java index 7c87d2a5a2..2d5389104b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java @@ -18,12 +18,13 @@ package org.apache.activemq.network.jms; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.JMSException; -import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; +import javax.jms.Session; import javax.naming.NamingException; import org.slf4j.Logger; @@ -31,10 +32,8 @@ import org.slf4j.LoggerFactory; /** * A Bridge to other JMS Topic providers - * + * * @org.apache.xbean.XBean - * - * */ public class JmsTopicConnector extends JmsConnector { private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class); @@ -42,28 +41,9 @@ public class JmsTopicConnector extends JmsConnector { private String localConnectionFactoryName; private TopicConnectionFactory outboundTopicConnectionFactory; private TopicConnectionFactory localTopicConnectionFactory; - private TopicConnection outboundTopicConnection; - private TopicConnection localTopicConnection; private InboundTopicBridge[] inboundTopicBridges; private OutboundTopicBridge[] outboundTopicBridges; - public boolean init() { - boolean result = super.init(); - if (result) { - try { - initializeForeignTopicConnection(); - initializeLocalTopicConnection(); - initializeInboundJmsMessageConvertor(); - initializeOutboundJmsMessageConvertor(); - initializeInboundTopicBridges(); - initializeOutboundTopicBridges(); - } catch (Exception e) { - LOG.error("Failed to initialize the JMSConnector", e); - } - } - return result; - } - /** * @return Returns the inboundTopicBridges. */ @@ -100,8 +80,7 @@ public class JmsTopicConnector extends JmsConnector { } /** - * @param localTopicConnectionFactory The localTopicConnectionFactory to - * set. + * @param localTopicConnectionFactory The localTopicConnectionFactory to set. */ public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) { this.localTopicConnectionFactory = localConnectionFactory; @@ -122,8 +101,7 @@ public class JmsTopicConnector extends JmsConnector { } /** - * @param outboundTopicConnectionFactoryName The - * outboundTopicConnectionFactoryName to set. + * @param outboundTopicConnectionFactoryName The outboundTopicConnectionFactoryName to set. */ public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) { this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName; @@ -147,45 +125,43 @@ public class JmsTopicConnector extends JmsConnector { * @return Returns the localTopicConnection. */ public TopicConnection getLocalTopicConnection() { - return localTopicConnection; + return (TopicConnection) localConnection.get(); } /** * @param localTopicConnection The localTopicConnection to set. */ public void setLocalTopicConnection(TopicConnection localTopicConnection) { - this.localTopicConnection = localTopicConnection; + this.localConnection.set(localTopicConnection); } /** * @return Returns the outboundTopicConnection. */ public TopicConnection getOutboundTopicConnection() { - return outboundTopicConnection; + return (TopicConnection) foreignConnection.get(); } /** * @param outboundTopicConnection The outboundTopicConnection to set. */ public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) { - this.outboundTopicConnection = foreignTopicConnection; + this.foreignConnection.set(foreignTopicConnection); } /** - * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory - * to set. + * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory to set. */ public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) { this.outboundTopicConnectionFactory = foreignTopicConnectionFactory; } - public void restartProducerConnection() throws NamingException, JMSException { - outboundTopicConnection = null; - initializeForeignTopicConnection(); - } + @Override + protected void initializeForeignConnection() throws NamingException, JMSException { - protected void initializeForeignTopicConnection() throws NamingException, JMSException { - if (outboundTopicConnection == null) { + final TopicConnection newConnection; + + if (foreignConnection.get() == null) { // get the connection factories if (outboundTopicConnectionFactory == null) { // look it up from JNDI @@ -193,31 +169,57 @@ public class JmsTopicConnector extends JmsConnector { outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class); if (outboundUsername != null) { - outboundTopicConnection = outboundTopicConnectionFactory + newConnection = outboundTopicConnectionFactory .createTopicConnection(outboundUsername, outboundPassword); } else { - outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection(); + newConnection = outboundTopicConnectionFactory.createTopicConnection(); } } else { - throw new JMSException("Cannot create localConnection - no information"); + throw new JMSException("Cannot create foreignConnection - no information"); } } else { if (outboundUsername != null) { - outboundTopicConnection = outboundTopicConnectionFactory + newConnection = outboundTopicConnectionFactory .createTopicConnection(outboundUsername, outboundPassword); } else { - outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection(); + newConnection = outboundTopicConnectionFactory.createTopicConnection(); } } + } else { + // Clear if for now in case something goes wrong during the init. + newConnection = (TopicConnection) foreignConnection.getAndSet(null); } - if (localClientId != null && localClientId.length() > 0) { - outboundTopicConnection.setClientID(getOutboundClientId()); + + if (outboundClientId != null && outboundClientId.length() > 0) { + newConnection.setClientID(getOutboundClientId()); } - outboundTopicConnection.start(); + newConnection.start(); + + outboundMessageConvertor.setConnection(newConnection); + + // Configure the bridges with the new Outbound connection. + initializeInboundDestinationBridgesOutboundSide(newConnection); + initializeOutboundDestinationBridgesOutboundSide(newConnection); + + // Register for any async error notifications now so we can reset in the + // case where there's not a lot of activity and a connection drops. + newConnection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + handleConnectionFailure(newConnection); + } + }); + + // At this point all looks good, so this our current connection now. + foreignConnection.set(newConnection); } - protected void initializeLocalTopicConnection() throws NamingException, JMSException { - if (localTopicConnection == null) { + @Override + protected void initializeLocalConnection() throws NamingException, JMSException { + + final TopicConnection newConnection; + + if (localConnection.get() == null) { // get the connection factories if (localTopicConnectionFactory == null) { if (embeddedConnectionFactory == null) { @@ -226,83 +228,100 @@ public class JmsTopicConnector extends JmsConnector { localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate .lookup(localConnectionFactoryName, TopicConnectionFactory.class); if (localUsername != null) { - localTopicConnection = localTopicConnectionFactory + newConnection = localTopicConnectionFactory .createTopicConnection(localUsername, localPassword); } else { - localTopicConnection = localTopicConnectionFactory.createTopicConnection(); + newConnection = localTopicConnectionFactory.createTopicConnection(); } } else { throw new JMSException("Cannot create localConnection - no information"); } } else { - localTopicConnection = embeddedConnectionFactory.createTopicConnection(); + newConnection = embeddedConnectionFactory.createTopicConnection(); } } else { if (localUsername != null) { - localTopicConnection = localTopicConnectionFactory.createTopicConnection(localUsername, - localPassword); + newConnection = localTopicConnectionFactory. + createTopicConnection(localUsername, localPassword); } else { - localTopicConnection = localTopicConnectionFactory.createTopicConnection(); + newConnection = localTopicConnectionFactory.createTopicConnection(); } } + + } else { + // Clear if for now in case something goes wrong during the init. + newConnection = (TopicConnection) localConnection.getAndSet(null); } + if (localClientId != null && localClientId.length() > 0) { - localTopicConnection.setClientID(getLocalClientId()); + newConnection.setClientID(getLocalClientId()); } - localTopicConnection.start(); + newConnection.start(); + + inboundMessageConvertor.setConnection(newConnection); + + // Configure the bridges with the new Local connection. + initializeInboundDestinationBridgesLocalSide(newConnection); + initializeOutboundDestinationBridgesLocalSide(newConnection); + + // Register for any async error notifications now so we can reset in the + // case where there's not a lot of activity and a connection drops. + newConnection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + handleConnectionFailure(newConnection); + } + }); + + // At this point all looks good, so this our current connection now. + localConnection.set(newConnection); } - protected void initializeInboundJmsMessageConvertor() { - inboundMessageConvertor.setConnection(localTopicConnection); - } - - protected void initializeOutboundJmsMessageConvertor() { - outboundMessageConvertor.setConnection(outboundTopicConnection); - } - - protected void initializeInboundTopicBridges() throws JMSException { + protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { if (inboundTopicBridges != null) { - TopicSession outboundSession = outboundTopicConnection - .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSession localSession = localTopicConnection.createTopicSession(false, - Session.AUTO_ACKNOWLEDGE); - for (int i = 0; i < inboundTopicBridges.length; i++) { - InboundTopicBridge bridge = inboundTopicBridges[i]; + TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + for (InboundTopicBridge bridge : inboundTopicBridges) { + String TopicName = bridge.getInboundTopicName(); + Topic foreignTopic = createForeignTopic(outboundSession, TopicName); + bridge.setConsumer(null); + bridge.setConsumerTopic(foreignTopic); + bridge.setConsumerConnection(connection); + bridge.setJmsConnector(this); + addInboundBridge(bridge); + } + outboundSession.close(); + } + } + + protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException { + if (inboundTopicBridges != null) { + TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); + + for (InboundTopicBridge bridge : inboundTopicBridges) { String localTopicName = bridge.getLocalTopicName(); Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); - String topicName = bridge.getInboundTopicName(); - Topic foreignTopic = createForeignTopic(outboundSession, topicName); - bridge.setConsumerTopic(foreignTopic); bridge.setProducerTopic(activemqTopic); - bridge.setProducerConnection(localTopicConnection); - bridge.setConsumerConnection(outboundTopicConnection); + bridge.setProducerConnection(connection); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getInboundMessageConvertor()); } bridge.setJmsConnector(this); addInboundBridge(bridge); } - outboundSession.close(); localSession.close(); } } - protected void initializeOutboundTopicBridges() throws JMSException { + protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException { if (outboundTopicBridges != null) { - TopicSession outboundSession = outboundTopicConnection - .createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSession localSession = localTopicConnection.createTopicSession(false, - Session.AUTO_ACKNOWLEDGE); - for (int i = 0; i < outboundTopicBridges.length; i++) { - OutboundTopicBridge bridge = outboundTopicBridges[i]; - String localTopicName = bridge.getLocalTopicName(); - Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); + TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + for (OutboundTopicBridge bridge : outboundTopicBridges) { String topicName = bridge.getOutboundTopicName(); Topic foreignTopic = createForeignTopic(outboundSession, topicName); - bridge.setConsumerTopic(activemqTopic); bridge.setProducerTopic(foreignTopic); - bridge.setProducerConnection(outboundTopicConnection); - bridge.setConsumerConnection(localTopicConnection); + bridge.setProducerConnection(connection); if (bridge.getJmsMessageConvertor() == null) { bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); } @@ -310,6 +329,23 @@ public class JmsTopicConnector extends JmsConnector { addOutboundBridge(bridge); } outboundSession.close(); + } + } + + protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException { + if (outboundTopicBridges != null) { + TopicSession localSession = + connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + for (OutboundTopicBridge bridge : outboundTopicBridges) { + String localTopicName = bridge.getLocalTopicName(); + Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); + bridge.setConsumer(null); + bridge.setConsumerTopic(activemqTopic); + bridge.setConsumerConnection(connection); + bridge.setJmsConnector(this); + addOutboundBridge(bridge); + } localSession.close(); } } @@ -317,7 +353,7 @@ public class JmsTopicConnector extends JmsConnector { protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, Connection replyToConsumerConnection) { Topic replyToProducerTopic = (Topic)destination; - boolean isInbound = replyToProducerConnection.equals(localTopicConnection); + boolean isInbound = replyToProducerConnection.equals(localConnection.get()); if (isInbound) { InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundQueueBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundQueueBridge.java index c9eebd1a9e..9714531b19 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundQueueBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundQueueBridge.java @@ -17,11 +17,12 @@ package org.apache.activemq.network.jms; /** - * Create an Outbound Queue Bridge - * + * Create an Outbound Queue Bridge. By default the bridge uses the same + * name for both the inbound and outbound queues, however this can be altered + * by using the public setter methods to configure both inbound and outbound + * queue names. + * * @org.apache.xbean.XBean - * - * */ public class OutboundQueueBridge extends QueueBridge { @@ -30,7 +31,7 @@ public class OutboundQueueBridge extends QueueBridge { /** * Constructor that takes a foreign destination as an argument - * + * * @param outboundQueueName */ public OutboundQueueBridge(String outboundQueueName) { @@ -39,7 +40,7 @@ public class OutboundQueueBridge extends QueueBridge { } /** - * Default Contructor + * Default Constructor */ public OutboundQueueBridge() { } @@ -52,6 +53,10 @@ public class OutboundQueueBridge extends QueueBridge { } /** + * Sets the name of the outbound queue name. If the inbound queue name + * has not been set already then this method uses the provided queue name + * to set the inbound topic name as well. + * * @param outboundQueueName The outboundQueueName to set. */ public void setOutboundQueueName(String outboundQueueName) { @@ -74,5 +79,4 @@ public class OutboundQueueBridge extends QueueBridge { public void setLocalQueueName(String localQueueName) { this.localQueueName = localQueueName; } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundTopicBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundTopicBridge.java index cb7c1c7648..52e27d96ec 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundTopicBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/OutboundTopicBridge.java @@ -17,11 +17,12 @@ package org.apache.activemq.network.jms; /** - * Create an Outbound Topic Bridge - * + * Create an Outbound Topic Bridge. By default the bridge uses the same + * name for both the inbound and outbound topics, however this can be altered + * by using the public setter methods to configure both inbound and outbound + * topic names. + * * @org.apache.xbean.XBean - * - * */ public class OutboundTopicBridge extends TopicBridge { @@ -30,7 +31,7 @@ public class OutboundTopicBridge extends TopicBridge { /** * Constructor that takes a foreign destination as an argument - * + * * @param outboundTopicName */ public OutboundTopicBridge(String outboundTopicName) { @@ -52,6 +53,10 @@ public class OutboundTopicBridge extends TopicBridge { } /** + * Sets the name of the outbound topic name. If the inbound topic name + * has not been set already then this method uses the provided topic name + * to set the inbound topic name as well. + * * @param outboundTopicName The outboundTopicName to set. */ public void setOutboundTopicName(String outboundTopicName) { diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java index 34bad1efad..6dc38024fb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java @@ -28,9 +28,7 @@ import javax.jms.QueueSession; import javax.jms.Session; /** - * A Destination bridge is used to bridge between to different JMS systems - * - * + * A Destination bridge is used to bridge Queues between to different JMS systems */ class QueueBridge extends DestinationBridge { protected Queue consumerQueue; @@ -55,6 +53,7 @@ class QueueBridge extends DestinationBridge { protected MessageConsumer createConsumer() throws JMSException { // set up the consumer + if (consumerConnection == null) return null; consumerSession = consumerConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer consumer = null; @@ -64,20 +63,28 @@ class QueueBridge extends DestinationBridge { consumer = consumerSession.createReceiver(consumerQueue); } + consumer.setMessageListener(this); + return consumer; } protected synchronized MessageProducer createProducer() throws JMSException { + if (producerConnection == null) return null; producerSession = producerConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); producer = producerSession.createSender(null); return producer; } protected synchronized void sendMessage(Message message) throws JMSException { - if (producer == null) { - createProducer(); + if (producer == null && createProducer() == null) { + throw new JMSException("Producer for remote queue not available."); + } + try { + producer.send(producerQueue, message); + } catch (JMSException e) { + producer = null; + throw e; } - producer.send(producerQueue, message); } /** @@ -92,6 +99,13 @@ class QueueBridge extends DestinationBridge { */ public void setConsumerConnection(QueueConnection consumerConnection) { this.consumerConnection = consumerConnection; + if (started.get()) { + try { + createConsumer(); + } catch(Exception e) { + jmsConnector.handleConnectionFailure(getConnnectionForConsumer()); + } + } } /** @@ -157,5 +171,4 @@ class QueueBridge extends DestinationBridge { protected Connection getConnectionForProducer() { return getProducerConnection(); } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/ReconnectionPolicy.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/ReconnectionPolicy.java new file mode 100644 index 0000000000..65035587b5 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/ReconnectionPolicy.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.network.jms; + +/** + * A policy object that defines how a {@link JmsConnector} deals with + * reconnection of the local and foreign connections. + * + * @org.apache.xbean.XBean element="reconnectionPolicy" + */ +public class ReconnectionPolicy { + + private int maxSendRetries = 10; + private long sendRetryDelay = 1000L; + + private int maxReconnectAttempts = -1; + private int maxInitialConnectAttempts = -1; + private long maximumReconnectDelay = 30000; + private long initialReconnectDelay = 1000L; + private boolean useExponentialBackOff = false; + private double backOffMultiplier = 2.0; + + /** + * Gets the maximum number of a times a Message send should be retried before + * a JMSExeception is thrown indicating that the operation failed. + * + * @return number of send retries that will be performed. + */ + public int getMaxSendRetries() { + return maxSendRetries; + } + + /** + * Sets the maximum number of a times a Message send should be retried before + * a JMSExeception is thrown indicating that the operation failed. + * + * @param maxRetries + * number of send retries that will be performed. + */ + public void setMaxSendRetries(int maxSendRetries) { + this.maxSendRetries = maxSendRetries; + } + + /** + * Get the amount of time the DestionationBridge will wait between attempts + * to forward a message. + * + * @return time in milliseconds to wait between send attempts. + */ + public long getSendRetryDelay() { + return this.sendRetryDelay; + } + + /** + * Set the amount of time the DestionationBridge will wait between attempts + * to forward a message. The default policy limits the minimum time between + * send attempt to one second. + * + * @param sendRetryDelay + * Time in milliseconds to wait before attempting another send. + */ + public void setSendRetyDelay(long sendRetryDelay) { + if (sendRetryDelay < 1000L) { + this.sendRetryDelay = 1000L; + } + + this.sendRetryDelay = sendRetryDelay; + } + + /** + * Gets the number of time that {@link JmsConnector} will attempt to connect + * or reconnect before giving up. By default the policy sets this value to + * a negative value meaning try forever. + * + * @return the number of attempts to connect before giving up. + */ + public int getMaxReconnectAttempts() { + return maxReconnectAttempts; + } + + /** + * Sets the number of time that {@link JmsConnector} will attempt to connect + * or reconnect before giving up. By default the policy sets this value to + * a negative value meaning try forever, set to a positive value to retry a + * fixed number of times, or zero to never try and reconnect. + * + * @param maxReconnectAttempts + */ + public void setMaxReconnectAttempts(int maxReconnectAttempts) { + this.maxReconnectAttempts = maxReconnectAttempts; + } + + /** + * Gets the maximum number of times that the {@link JmsConnector} will try + * to connect on startup to before it marks itself as failed and does not + * try any further connections. + * + * @returns the max number of times a connection attempt is made before failing. + */ + public int getMaxInitialConnectAttempts() { + return this.maxInitialConnectAttempts; + } + + /** + * Sets the maximum number of times that the {@link JmsConnector} will try + * to connect on startup to before it marks itself as failed and does not + * try any further connections. + * + * @param maxAttempts + * The max number of times a connection attempt is made before failing. + */ + public void setMaxInitialConnectAttempts(int maxAttempts) { + this.maxInitialConnectAttempts = maxAttempts; + } + + /** + * Gets the maximum delay that is inserted between each attempt to connect + * before another attempt is made. The default setting for this value is + * 30 seconds. + * + * @return the max delay between connection attempts in milliseconds. + */ + public long getMaximumReconnectDelay() { + return maximumReconnectDelay; + } + + /** + * Sets the maximum delay that is inserted between each attempt to connect + * before another attempt is made. + * + * @param maximumReconnectDelay + * The maximum delay between connection attempts in milliseconds. + */ + public void setMaximumReconnectDelay(long maximumReconnectDelay) { + this.maximumReconnectDelay = maximumReconnectDelay; + } + + /** + * Gets the initial delay value used before a reconnection attempt is made. If the + * use exponential back-off value is set to false then this will be the fixed time + * between connection attempts. By default this value is set to one second. + * + * @return time in milliseconds that will be used between connection retries. + */ + public long getInitialReconnectDelay() { + return initialReconnectDelay; + } + + /** + * Gets the initial delay value used before a reconnection attempt is made. If the + * use exponential back-off value is set to false then this will be the fixed time + * between connection attempts. By default this value is set to one second. + + * @param initialReconnectDelay + * Time in milliseconds to wait before the first reconnection attempt. + */ + public void setInitialReconnectDelay(long initialReconnectDelay) { + this.initialReconnectDelay = initialReconnectDelay; + } + + /** + * Gets whether the policy uses the set back-off multiplier to grow the time between + * connection attempts. + * + * @return true if the policy will grow the time between connection attempts. + */ + public boolean isUseExponentialBackOff() { + return useExponentialBackOff; + } + + /** + * Sets whether the policy uses the set back-off multiplier to grow the time between + * connection attempts. + * + * @param useExponentialBackOff + */ + public void setUseExponentialBackOff(boolean useExponentialBackOff) { + this.useExponentialBackOff = useExponentialBackOff; + } + + /** + * Gets the multiplier used to grow the delay between connection attempts from the initial + * time to the max set time. By default this value is set to 2.0. + * + * @return the currently configured connection delay multiplier. + */ + public double getBackOffMultiplier() { + return backOffMultiplier; + } + + /** + * Gets the multiplier used to grow the delay between connection attempts from the initial + * time to the max set time. By default this value is set to 2.0. + * + * @param backOffMultiplier + * The multiplier value used to grow the reconnection delay. + */ + public void setBackOffMultiplier(double backOffMultiplier) { + this.backOffMultiplier = backOffMultiplier; + } + + /** + * Returns the next computed delay value that the connection controller should use to + * wait before attempting another connection for the {@link JmsConnector}. + * + * @param attempt + * The current connection attempt. + * + * @return the next delay amount in milliseconds. + */ + public long getNextDelay(int attempt) { + + if (attempt == 0) { + return 0; + } + + long nextDelay = initialReconnectDelay; + + if (useExponentialBackOff) { + nextDelay = nextDelay * (long)(attempt * backOffMultiplier); + } + + if (maximumReconnectDelay > 0 && nextDelay > maximumReconnectDelay) { + nextDelay = maximumReconnectDelay; + } + + return nextDelay; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java index e628b6f66c..5f15702dbf 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java @@ -23,18 +23,17 @@ import javax.jms.Message; /** * Converts Message from one JMS to another - * + * * @org.apache.xbean.XBean - * - * */ public class SimpleJmsMessageConvertor implements JmsMesageConvertor { /** * Convert a foreign JMS Message to a native ActiveMQ Message - Inbound or - * visa-versa outbound - * + * visa-versa outbound. + * * @param message + * The target message to convert to a native ActiveMQ message * @return the converted message * @throws JMSException */ @@ -42,6 +41,19 @@ public class SimpleJmsMessageConvertor implements JmsMesageConvertor { return message; } + /** + * Convert a foreign JMS Message to a native ActiveMQ Message - Inbound or + * visa-versa outbound. If the replyTo Destination instance is not null + * then the Message is configured with the given replyTo value. + * + * @param message + * The target message to convert to a native ActiveMQ message + * @param replyTo + * The replyTo Destination to set on the converted Message. + * + * @return the converted message + * @throws JMSException + */ public Message convert(Message message, Destination replyTo) throws JMSException { Message msg = convert(message); if (replyTo != null) { @@ -55,5 +67,4 @@ public class SimpleJmsMessageConvertor implements JmsMesageConvertor { public void setConnection(Connection connection) { // do nothing } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java b/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java index 6ca7fa5965..1480daf433 100755 --- a/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java @@ -17,7 +17,6 @@ package org.apache.activemq.network.jms; import javax.jms.Connection; -import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -30,8 +29,8 @@ import javax.jms.TopicSession; /** * A Destination bridge is used to bridge between to different JMS systems - * - * + * + * */ class TopicBridge extends DestinationBridge { protected Topic consumerTopic; @@ -56,6 +55,7 @@ class TopicBridge extends DestinationBridge { protected MessageConsumer createConsumer() throws JMSException { // set up the consumer + if (consumerConnection == null) return null; consumerSession = consumerConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer consumer = null; if (consumerName != null && consumerName.length() > 0) { @@ -72,20 +72,29 @@ class TopicBridge extends DestinationBridge { consumer = consumerSession.createSubscriber(consumerTopic); } } + + consumer.setMessageListener(this); + return consumer; } protected synchronized MessageProducer createProducer() throws JMSException { + if (producerConnection == null) return null; producerSession = producerConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); producer = producerSession.createPublisher(null); return producer; } protected synchronized void sendMessage(Message message) throws JMSException { - if (producer == null) { - createProducer(); + if (producer == null && createProducer() == null) { + throw new JMSException("Producer for remote queue not available."); + } + try { + producer.publish(producerTopic, message); + } catch (JMSException e) { + producer = null; + throw e; } - producer.publish(producerTopic, message); } /** @@ -100,6 +109,13 @@ class TopicBridge extends DestinationBridge { */ public void setConsumerConnection(TopicConnection consumerConnection) { this.consumerConnection = consumerConnection; + if (started.get()) { + try { + createConsumer(); + } catch(Exception e) { + jmsConnector.handleConnectionFailure(getConnnectionForConsumer()); + } + } } /** diff --git a/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueBridgeStandaloneReconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueBridgeStandaloneReconnectTest.java new file mode 100644 index 0000000000..3004884a42 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueBridgeStandaloneReconnectTest.java @@ -0,0 +1,366 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network.jms; + +import static org.junit.Assert.*; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.util.ArrayList; +import java.util.Iterator; + +public class QueueBridgeStandaloneReconnectTest { + + private static final Logger LOG = LoggerFactory.getLogger(QueueBridgeStandaloneReconnectTest.class); + + private JmsQueueConnector jmsQueueConnector; + + private BrokerService localBroker; + private BrokerService foreignBroker; + + private ActiveMQConnectionFactory localConnectionFactory; + private ActiveMQConnectionFactory foreignConnectionFactory; + + private Destination outbound; + private Destination inbound; + + private ArrayList connections = new ArrayList(); + + @Test + public void testSendAndReceiveOverConnectedBridges() throws Exception { + + startLocalBroker(); + startForeignBroker(); + + jmsQueueConnector.start(); + + sendMessageToForeignBroker("to.foreign.broker"); + sendMessageToLocalBroker("to.local.broker"); + + final MessageConsumer local = createConsumerForLocalBroker(); + + assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = local.receive(100); + if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) { + return true; + } + return false; + } + })); + + final MessageConsumer foreign = createConsumerForForeignBroker(); + + assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = foreign.receive(100); + if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) { + return true; + } + return false; + } + })); + } + + @Test + public void testSendAndReceiveOverBridgeWhenStartedBeforeBrokers() throws Exception { + + jmsQueueConnector.start(); + + startLocalBroker(); + startForeignBroker(); + + assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return jmsQueueConnector.isConnected(); + } + })); + + sendMessageToForeignBroker("to.foreign.broker"); + sendMessageToLocalBroker("to.local.broker"); + + final MessageConsumer local = createConsumerForLocalBroker(); + + assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = local.receive(100); + if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) { + return true; + } + return false; + } + })); + + final MessageConsumer foreign = createConsumerForForeignBroker(); + + assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = foreign.receive(100); + if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) { + return true; + } + return false; + } + })); + } + + @Test + public void testSendAndReceiveOverBridgeWithRestart() throws Exception { + + startLocalBroker(); + startForeignBroker(); + + jmsQueueConnector.start(); + + assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return jmsQueueConnector.isConnected(); + } + })); + + stopLocalBroker(); + stopForeignBroker(); + + assertTrue("Should have detected connection drop.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return !jmsQueueConnector.isConnected(); + } + })); + + startLocalBroker(); + startForeignBroker(); + + assertTrue("Should have Re-Connected.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return jmsQueueConnector.isConnected(); + } + })); + + sendMessageToForeignBroker("to.foreign.broker"); + sendMessageToLocalBroker("to.local.broker"); + + final MessageConsumer local = createConsumerForLocalBroker(); + + assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = local.receive(100); + if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) { + return true; + } + return false; + } + })); + + final MessageConsumer foreign = createConsumerForForeignBroker(); + + assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = foreign.receive(100); + if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) { + return true; + } + return false; + } + })); + } + + @Before + public void setUp() throws Exception { + + localConnectionFactory = createLocalConnectionFactory(); + foreignConnectionFactory = createForeignConnectionFactory(); + + outbound = new ActiveMQQueue("RECONNECT.TEST.OUT.QUEUE"); + inbound = new ActiveMQQueue("RECONNECT.TEST.IN.QUEUE"); + + jmsQueueConnector = new JmsQueueConnector(); + + // Wire the bridges. + jmsQueueConnector.setOutboundQueueBridges( + new OutboundQueueBridge[] {new OutboundQueueBridge("RECONNECT.TEST.OUT.QUEUE")}); + jmsQueueConnector.setInboundQueueBridges( + new InboundQueueBridge[] {new InboundQueueBridge("RECONNECT.TEST.IN.QUEUE")}); + + // Tell it how to reach the two brokers. + jmsQueueConnector.setOutboundQueueConnectionFactory( + new ActiveMQConnectionFactory("tcp://localhost:61617")); + jmsQueueConnector.setLocalQueueConnectionFactory( + new ActiveMQConnectionFactory("tcp://localhost:61616")); + } + + @After + public void tearDown() throws Exception { + disposeConsumerConnections(); + + try { + jmsQueueConnector.stop(); + jmsQueueConnector = null; + } catch (Exception e) { + } + + try { + stopLocalBroker(); + } catch (Throwable e) { + } + try { + stopForeignBroker(); + } catch (Throwable e) { + } + } + + protected void disposeConsumerConnections() { + for (Iterator iter = connections.iterator(); iter.hasNext();) { + Connection connection = iter.next(); + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected void startLocalBroker() throws Exception { + if (localBroker == null) { + localBroker = createFirstBroker(); + localBroker.start(); + localBroker.waitUntilStarted(); + } + } + + protected void stopLocalBroker() throws Exception { + if (localBroker != null) { + localBroker.stop(); + localBroker.waitUntilStopped(); + localBroker = null; + } + } + + protected void startForeignBroker() throws Exception { + if (foreignBroker == null) { + foreignBroker = createSecondBroker(); + foreignBroker.start(); + foreignBroker.waitUntilStarted(); + } + } + + protected void stopForeignBroker() throws Exception { + if (foreignBroker != null) { + foreignBroker.stop(); + foreignBroker.waitUntilStopped(); + foreignBroker = null; + } + } + + protected BrokerService createFirstBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("broker1"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://localhost:61616"); + + return broker; + } + + protected BrokerService createSecondBroker() throws Exception { + + BrokerService broker = new BrokerService(); + broker.setBrokerName("broker2"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://localhost:61617"); + + return broker; + } + + protected ActiveMQConnectionFactory createLocalConnectionFactory() { + return new ActiveMQConnectionFactory("tcp://localhost:61616"); + } + + protected ActiveMQConnectionFactory createForeignConnectionFactory() { + return new ActiveMQConnectionFactory("tcp://localhost:61617"); + } + + protected void sendMessageToForeignBroker(String text) throws JMSException { + Connection connection = null; + try { + connection = localConnectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(outbound); + TextMessage message = session.createTextMessage(); + message.setText(text); + producer.send(message); + } finally { + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected void sendMessageToLocalBroker(String text) throws JMSException { + Connection connection = null; + try { + connection = foreignConnectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(inbound); + TextMessage message = session.createTextMessage(); + message.setText(text); + producer.send(message); + } finally { + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected MessageConsumer createConsumerForLocalBroker() throws JMSException { + Connection connection = localConnectionFactory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + return session.createConsumer(inbound); + } + + protected MessageConsumer createConsumerForForeignBroker() throws JMSException { + Connection connection = foreignConnectionFactory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + return session.createConsumer(outbound); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueOutboundBridgeReconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueOutboundBridgeReconnectTest.java new file mode 100644 index 0000000000..5895ccab6e --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/network/jms/QueueOutboundBridgeReconnectTest.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network.jms; + +import static org.junit.Assert.*; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +/** + * These test cases are used to verify that queue outbound bridge connections get + * re-established in all broker restart scenarios. This is possible when the + * outbound bridge is configured using the failover URI with a timeout. + */ +public class QueueOutboundBridgeReconnectTest { + + private static final Logger LOG = LoggerFactory.getLogger(QueueOutboundBridgeReconnectTest.class); + + private BrokerService producerBroker; + private BrokerService consumerBroker; + private ActiveMQConnectionFactory producerConnectionFactory; + private ActiveMQConnectionFactory consumerConnectionFactory; + private Destination destination; + private ArrayList connections = new ArrayList(); + + @Test + public void testMultipleProducerBrokerRestarts() throws Exception { + for (int i = 0; i < 10; i++) { + testWithProducerBrokerRestart(); + disposeConsumerConnections(); + } + } + + @Test + public void testRestartProducerWithNoConsumer() throws Exception { + stopConsumerBroker(); + + startProducerBroker(); + sendMessage("test123"); + sendMessage("test456"); + } + + @Test + public void testWithoutRestartsConsumerFirst() throws Exception { + startConsumerBroker(); + startProducerBroker(); + sendMessage("test123"); + sendMessage("test456"); + + MessageConsumer consumer = createConsumer(); + Message message = consumer.receive(3000); + assertNotNull(message); + assertEquals("test123", ((TextMessage)message).getText()); + + message = consumer.receive(3000); + assertNotNull(message); + assertEquals("test456", ((TextMessage)message).getText()); + + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testWithoutRestartsProducerFirst() throws Exception { + startProducerBroker(); + sendMessage("test123"); + + startConsumerBroker(); + + // unless using a failover URI, the first attempt of this send will likely fail, + // so increase the timeout below to give the bridge time to recover + sendMessage("test456"); + + MessageConsumer consumer = createConsumer(); + Message message = consumer.receive(5000); + assertNotNull(message); + assertEquals("test123", ((TextMessage) message).getText()); + + message = consumer.receive(5000); + assertNotNull(message); + assertEquals("test456", ((TextMessage) message).getText()); + + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testWithProducerBrokerRestart() throws Exception { + startProducerBroker(); + startConsumerBroker(); + + sendMessage("test123"); + + MessageConsumer consumer = createConsumer(); + Message message = consumer.receive(5000); + assertNotNull(message); + assertEquals("test123", ((TextMessage)message).getText()); + assertNull(consumer.receiveNoWait()); + + // Restart the first broker... + stopProducerBroker(); + startProducerBroker(); + + sendMessage("test123"); + message = consumer.receive(5000); + assertNotNull(message); + assertEquals("test123", ((TextMessage)message).getText()); + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testWithConsumerBrokerRestart() throws Exception { + + startProducerBroker(); + startConsumerBroker(); + + sendMessage("test123"); + + final MessageConsumer consumer1 = createConsumer(); + Message message = consumer1.receive(5000); + assertNotNull(message); + assertEquals("test123", ((TextMessage)message).getText()); + assertNull(consumer1.receiveNoWait()); + consumer1.close(); + + // Restart the first broker... + stopConsumerBroker(); + startConsumerBroker(); + + // unless using a failover URI, the first attempt of this send will likely fail, + // so increase the timeout below to give the bridge time to recover + sendMessage("test123"); + + final MessageConsumer consumer2 = createConsumer(); + assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + Message message = consumer2.receiveNoWait(); + if (message == null || !((TextMessage)message).getText().equals("test123")) { + return false; + } + return true; + } + })); + assertNull(consumer2.receiveNoWait()); + } + + @Test + public void testWithConsumerBrokerStartDelay() throws Exception { + + startConsumerBroker(); + final MessageConsumer consumer = createConsumer(); + + TimeUnit.SECONDS.sleep(5); + + startProducerBroker(); + + sendMessage("test123"); + assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + Message message = consumer.receiveNoWait(); + if (message == null || !((TextMessage)message).getText().equals("test123")) { + return false; + } + return true; + } + })); + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testWithProducerBrokerStartDelay() throws Exception { + + startProducerBroker(); + + TimeUnit.SECONDS.sleep(5); + + startConsumerBroker(); + MessageConsumer consumer = createConsumer(); + + sendMessage("test123"); + Message message = consumer.receive(5000); + assertNotNull(message); + assertEquals("test123", ((TextMessage)message).getText()); + assertNull(consumer.receiveNoWait()); + } + + @Before + public void setUp() throws Exception { + + producerConnectionFactory = createProducerConnectionFactory(); + consumerConnectionFactory = createConsumerConnectionFactory(); + destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE"); + } + + @After + public void tearDown() throws Exception { + disposeConsumerConnections(); + try { + stopProducerBroker(); + } catch (Throwable e) { + } + try { + stopConsumerBroker(); + } catch (Throwable e) { + } + } + + protected void disposeConsumerConnections() { + for (Iterator iter = connections.iterator(); iter.hasNext();) { + Connection connection = iter.next(); + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected void startProducerBroker() throws Exception { + if (producerBroker == null) { + producerBroker = createFirstBroker(); + producerBroker.start(); + } + } + + protected void stopProducerBroker() throws Exception { + if (producerBroker != null) { + producerBroker.stop(); + producerBroker = null; + } + } + + protected void startConsumerBroker() throws Exception { + if (consumerBroker == null) { + consumerBroker = createSecondBroker(); + consumerBroker.start(); + } + } + + protected void stopConsumerBroker() throws Exception { + if (consumerBroker != null) { + consumerBroker.stop(); + consumerBroker = null; + } + } + + protected BrokerService createFirstBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("broker1"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://localhost:61616"); + broker.addConnector("vm://broker1"); + + JmsQueueConnector jmsQueueConnector = new JmsQueueConnector(); + jmsQueueConnector.setOutboundQueueBridges( + new OutboundQueueBridge[] {new OutboundQueueBridge("RECONNECT.TEST.QUEUE")}); + jmsQueueConnector.setOutboundQueueConnectionFactory( + new ActiveMQConnectionFactory("tcp://localhost:61617")); + + broker.setJmsBridgeConnectors(new JmsConnector[]{jmsQueueConnector}); + + return broker; + } + + protected BrokerService createSecondBroker() throws Exception { + + BrokerService broker = new BrokerService(); + broker.setBrokerName("broker2"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://localhost:61617"); + broker.addConnector("vm://broker2"); + + return broker; + } + + protected ActiveMQConnectionFactory createProducerConnectionFactory() { + return new ActiveMQConnectionFactory("vm://broker1"); + } + + protected ActiveMQConnectionFactory createConsumerConnectionFactory() { + return new ActiveMQConnectionFactory("vm://broker2"); + } + + protected void sendMessage(String text) throws JMSException { + Connection connection = null; + try { + connection = producerConnectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage(); + message.setText(text); + producer.send(message); + } finally { + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected MessageConsumer createConsumer() throws JMSException { + Connection connection = consumerConnectionFactory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + return session.createConsumer(destination); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicBridgeStandaloneReconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicBridgeStandaloneReconnectTest.java new file mode 100644 index 0000000000..49360125b0 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicBridgeStandaloneReconnectTest.java @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network.jms; + +import static org.junit.Assert.*; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.util.ArrayList; +import java.util.Iterator; + +public class TopicBridgeStandaloneReconnectTest { + + private static final Logger LOG = LoggerFactory.getLogger(TopicBridgeStandaloneReconnectTest.class); + + private JmsTopicConnector jmsTopicConnector; + + private BrokerService localBroker; + private BrokerService foreignBroker; + + private ActiveMQConnectionFactory localConnectionFactory; + private ActiveMQConnectionFactory foreignConnectionFactory; + + private Destination outbound; + private Destination inbound; + + private ArrayList connections = new ArrayList(); + + @Test + public void testSendAndReceiveOverConnectedBridges() throws Exception { + + startLocalBroker(); + startForeignBroker(); + + jmsTopicConnector.start(); + + final MessageConsumer local = createConsumerForLocalBroker(); + final MessageConsumer foreign = createConsumerForForeignBroker(); + + sendMessageToForeignBroker("to.foreign.broker"); + sendMessageToLocalBroker("to.local.broker"); + + assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = local.receive(100); + if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) { + return true; + } + return false; + } + })); + + assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = foreign.receive(100); + if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) { + return true; + } + return false; + } + })); + } + + @Test + public void testSendAndReceiveOverBridgeWhenStartedBeforeBrokers() throws Exception { + + jmsTopicConnector.start(); + + startLocalBroker(); + startForeignBroker(); + + assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return jmsTopicConnector.isConnected(); + } + })); + + final MessageConsumer local = createConsumerForLocalBroker(); + final MessageConsumer foreign = createConsumerForForeignBroker(); + + sendMessageToForeignBroker("to.foreign.broker"); + sendMessageToLocalBroker("to.local.broker"); + + assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = local.receive(100); + if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) { + return true; + } + return false; + } + })); + + assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = foreign.receive(100); + if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) { + return true; + } + return false; + } + })); + } + + @Test + public void testSendAndReceiveOverBridgeWithRestart() throws Exception { + + startLocalBroker(); + startForeignBroker(); + + jmsTopicConnector.start(); + + assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return jmsTopicConnector.isConnected(); + } + })); + + stopLocalBroker(); + stopForeignBroker(); + + assertTrue("Should have detected connection drop.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return !jmsTopicConnector.isConnected(); + } + })); + + startLocalBroker(); + startForeignBroker(); + + assertTrue("Should have Re-Connected.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return jmsTopicConnector.isConnected(); + } + })); + + final MessageConsumer local = createConsumerForLocalBroker(); + final MessageConsumer foreign = createConsumerForForeignBroker(); + + sendMessageToForeignBroker("to.foreign.broker"); + sendMessageToLocalBroker("to.local.broker"); + + assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = local.receive(100); + if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) { + return true; + } + return false; + } + })); + + assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = foreign.receive(100); + if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) { + return true; + } + return false; + } + })); + } + + @Before + public void setUp() throws Exception { + + localConnectionFactory = createLocalConnectionFactory(); + foreignConnectionFactory = createForeignConnectionFactory(); + + outbound = new ActiveMQTopic("RECONNECT.TEST.OUT.TOPIC"); + inbound = new ActiveMQTopic("RECONNECT.TEST.IN.TOPIC"); + + jmsTopicConnector = new JmsTopicConnector(); + + // Wire the bridges. + jmsTopicConnector.setOutboundTopicBridges( + new OutboundTopicBridge[] {new OutboundTopicBridge("RECONNECT.TEST.OUT.TOPIC")}); + jmsTopicConnector.setInboundTopicBridges( + new InboundTopicBridge[] {new InboundTopicBridge("RECONNECT.TEST.IN.TOPIC")}); + + // Tell it how to reach the two brokers. + jmsTopicConnector.setOutboundTopicConnectionFactory( + new ActiveMQConnectionFactory("tcp://localhost:61617")); + jmsTopicConnector.setLocalTopicConnectionFactory( + new ActiveMQConnectionFactory("tcp://localhost:61616")); + } + + @After + public void tearDown() throws Exception { + disposeConsumerConnections(); + + try { + jmsTopicConnector.stop(); + jmsTopicConnector = null; + } catch (Exception e) { + } + + try { + stopLocalBroker(); + } catch (Throwable e) { + } + try { + stopForeignBroker(); + } catch (Throwable e) { + } + } + + protected void disposeConsumerConnections() { + for (Iterator iter = connections.iterator(); iter.hasNext();) { + Connection connection = iter.next(); + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected void startLocalBroker() throws Exception { + if (localBroker == null) { + localBroker = createFirstBroker(); + localBroker.start(); + localBroker.waitUntilStarted(); + } + } + + protected void stopLocalBroker() throws Exception { + if (localBroker != null) { + localBroker.stop(); + localBroker.waitUntilStopped(); + localBroker = null; + } + } + + protected void startForeignBroker() throws Exception { + if (foreignBroker == null) { + foreignBroker = createSecondBroker(); + foreignBroker.start(); + foreignBroker.waitUntilStarted(); + } + } + + protected void stopForeignBroker() throws Exception { + if (foreignBroker != null) { + foreignBroker.stop(); + foreignBroker.waitUntilStopped(); + foreignBroker = null; + } + } + + protected BrokerService createFirstBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("broker1"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://localhost:61616"); + + return broker; + } + + protected BrokerService createSecondBroker() throws Exception { + + BrokerService broker = new BrokerService(); + broker.setBrokerName("broker2"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://localhost:61617"); + + return broker; + } + + protected ActiveMQConnectionFactory createLocalConnectionFactory() { + return new ActiveMQConnectionFactory("tcp://localhost:61616"); + } + + protected ActiveMQConnectionFactory createForeignConnectionFactory() { + return new ActiveMQConnectionFactory("tcp://localhost:61617"); + } + + protected void sendMessageToForeignBroker(String text) throws JMSException { + Connection connection = null; + try { + connection = localConnectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(outbound); + TextMessage message = session.createTextMessage(); + message.setText(text); + producer.send(message); + } finally { + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected void sendMessageToLocalBroker(String text) throws JMSException { + Connection connection = null; + try { + connection = foreignConnectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(inbound); + TextMessage message = session.createTextMessage(); + message.setText(text); + producer.send(message); + } finally { + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected MessageConsumer createConsumerForLocalBroker() throws JMSException { + Connection connection = localConnectionFactory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + return session.createConsumer(inbound); + } + + protected MessageConsumer createConsumerForForeignBroker() throws JMSException { + Connection connection = foreignConnectionFactory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + return session.createConsumer(outbound); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicOutboundBridgeReconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicOutboundBridgeReconnectTest.java new file mode 100644 index 0000000000..27e7a636d6 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/network/jms/TopicOutboundBridgeReconnectTest.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network.jms; + +import static org.junit.Assert.*; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +/** + * These test cases are used to verify that queue outbound bridge connections get + * re-established in all broker restart scenarios. This is possible when the + * outbound bridge is configured using the failover URI with a timeout. + */ +public class TopicOutboundBridgeReconnectTest { + + private static final Logger LOG = LoggerFactory.getLogger(TopicOutboundBridgeReconnectTest.class); + + private BrokerService producerBroker; + private BrokerService consumerBroker; + private ActiveMQConnectionFactory producerConnectionFactory; + private ActiveMQConnectionFactory consumerConnectionFactory; + private Destination destination; + private ArrayList connections = new ArrayList(); + + @Test + public void testMultipleProducerBrokerRestarts() throws Exception { + for (int i = 0; i < 10; i++) { + testWithProducerBrokerRestart(); + disposeConsumerConnections(); + } + } + + @Test + public void testWithoutRestartsConsumerFirst() throws Exception { + startConsumerBroker(); + startProducerBroker(); + + MessageConsumer consumer = createConsumer(); + + sendMessage("test123"); + sendMessage("test456"); + Message message = consumer.receive(2000); + assertNotNull(message); + assertEquals("test123", ((TextMessage)message).getText()); + + message = consumer.receive(5000); + assertNotNull(message); + assertEquals("test456", ((TextMessage)message).getText()); + + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testWithoutRestartsProducerFirst() throws Exception { + startProducerBroker(); + sendMessage("test123"); + + startConsumerBroker(); + + // unless using a failover URI, the first attempt of this send will likely fail, so increase the timeout below + // to give the bridge time to recover + sendMessage("test456"); + + MessageConsumer consumer = createConsumer(); + Message message = consumer.receive(5000); + assertNotNull(message); + assertEquals("test123", ((TextMessage) message).getText()); + + message = consumer.receive(5000); + assertNotNull(message); + assertEquals("test456", ((TextMessage) message).getText()); + + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testWithProducerBrokerRestart() throws Exception { + startProducerBroker(); + startConsumerBroker(); + + MessageConsumer consumer = createConsumer(); + + sendMessage("test123"); + Message message = consumer.receive(5000); + assertNotNull(message); + assertEquals("test123", ((TextMessage)message).getText()); + assertNull(consumer.receiveNoWait()); + + // Restart the first broker... + stopProducerBroker(); + startProducerBroker(); + + sendMessage("test123"); + message = consumer.receive(5000); + assertNotNull(message); + assertEquals("test123", ((TextMessage)message).getText()); + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testWithConsumerBrokerRestart() throws Exception { + startProducerBroker(); + startConsumerBroker(); + + final MessageConsumer consumer1 = createConsumer(); + + sendMessage("test123"); + Message message = consumer1.receive(5000); + assertNotNull(message); + assertEquals("test123", ((TextMessage)message).getText()); + assertNull(consumer1.receiveNoWait()); + consumer1.close(); + + // Restart the first broker... + stopConsumerBroker(); + startConsumerBroker(); + + // unless using a failover URI, the first attempt of this send will likely fail, so increase the timeout below + // to give the bridge time to recover + sendMessage("test123"); + + final MessageConsumer consumer2 = createConsumer(); + assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + Message message = consumer2.receiveNoWait(); + if (message == null || !((TextMessage)message).getText().equals("test123")) { + return false; + } + return true; + } + })); + assertNull(consumer2.receiveNoWait()); + } + + @Test + public void testWithConsumerBrokerStartDelay() throws Exception { + startConsumerBroker(); + final MessageConsumer consumer = createConsumer(); + + TimeUnit.SECONDS.sleep(5); + + startProducerBroker(); + + sendMessage("test123"); + assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + Message message = consumer.receiveNoWait(); + if (message == null || !((TextMessage)message).getText().equals("test123")) { + return false; + } + return true; + } + })); + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testWithProducerBrokerStartDelay() throws Exception { + startProducerBroker(); + + TimeUnit.SECONDS.sleep(5); + + startConsumerBroker(); + MessageConsumer consumer = createConsumer(); + + sendMessage("test123"); + Message message = consumer.receive(2000); + assertNotNull(message); + assertEquals("test123", ((TextMessage)message).getText()); + assertNull(consumer.receiveNoWait()); + } + + @Before + public void setUp() throws Exception { + producerConnectionFactory = createProducerConnectionFactory(); + consumerConnectionFactory = createConsumerConnectionFactory(); + destination = new ActiveMQTopic("RECONNECT.TEST.TOPIC"); + } + + @After + public void tearDown() throws Exception { + disposeConsumerConnections(); + try { + stopProducerBroker(); + } catch (Throwable e) { + } + try { + stopConsumerBroker(); + } catch (Throwable e) { + } + } + + protected void disposeConsumerConnections() { + for (Iterator iter = connections.iterator(); iter.hasNext();) { + Connection connection = iter.next(); + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected void startProducerBroker() throws Exception { + if (producerBroker == null) { + producerBroker = createFirstBroker(); + producerBroker.start(); + } + } + + protected void stopProducerBroker() throws Exception { + if (producerBroker != null) { + producerBroker.stop(); + producerBroker = null; + } + } + + protected void startConsumerBroker() throws Exception { + if (consumerBroker == null) { + consumerBroker = createSecondBroker(); + consumerBroker.start(); + } + } + + protected void stopConsumerBroker() throws Exception { + if (consumerBroker != null) { + consumerBroker.stop(); + consumerBroker = null; + } + } + + protected BrokerService createFirstBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("broker1"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://localhost:61616"); + broker.addConnector("vm://broker1"); + + JmsTopicConnector jmsTopicConnector = new JmsTopicConnector(); + jmsTopicConnector.setOutboundTopicBridges( + new OutboundTopicBridge[] {new OutboundTopicBridge("RECONNECT.TEST.TOPIC")}); + jmsTopicConnector.setOutboundTopicConnectionFactory( + new ActiveMQConnectionFactory("tcp://localhost:61617")); + + broker.setJmsBridgeConnectors(new JmsConnector[]{jmsTopicConnector}); + + return broker; + } + + protected BrokerService createSecondBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("broker2"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://localhost:61617"); + broker.addConnector("vm://broker2"); + + return broker; + } + + protected ActiveMQConnectionFactory createProducerConnectionFactory() { + return new ActiveMQConnectionFactory("vm://broker1"); + } + + protected ActiveMQConnectionFactory createConsumerConnectionFactory() { + return new ActiveMQConnectionFactory("vm://broker2"); + } + + protected void sendMessage(String text) throws JMSException { + Connection connection = null; + try { + connection = producerConnectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage(); + message.setText(text); + producer.send(message); + } finally { + try { + connection.close(); + } catch (Throwable ignore) { + } + } + } + + protected MessageConsumer createConsumer() throws JMSException { + Connection connection = consumerConnectionFactory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + return session.createConsumer(destination); + } +}