fix for: https://issues.apache.org/jira/browse/AMQ-2455
fix for: https://issues.apache.org/jira/browse/AMQ-3635

Adds reconnect logic and tests along with a policy class to allow for control over the reconnect process.
Reconnection of both local and foreign side of the JmsConnector is supported.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1238827 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-01-31 21:56:03 +00:00
parent d1357b4970
commit bc78238ad0
17 changed files with 2353 additions and 326 deletions

View File

@ -24,24 +24,22 @@ import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageListener; import javax.jms.MessageListener;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.naming.NamingException;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* A Destination bridge is used to bridge between to different JMS systems * A Destination bridge is used to bridge between to different JMS systems
*
*
*/ */
public abstract class DestinationBridge implements Service, MessageListener { public abstract class DestinationBridge implements Service, MessageListener {
private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class); private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class);
protected MessageConsumer consumer; protected MessageConsumer consumer;
protected AtomicBoolean started = new AtomicBoolean(false); protected AtomicBoolean started = new AtomicBoolean(false);
protected JmsMesageConvertor jmsMessageConvertor; protected JmsMesageConvertor jmsMessageConvertor;
protected boolean doHandleReplyTo = true; protected boolean doHandleReplyTo = true;
protected JmsConnector jmsConnector; protected JmsConnector jmsConnector;
private int maximumRetries = 10;
/** /**
* @return Returns the consumer. * @return Returns the consumer.
@ -78,26 +76,13 @@ public abstract class DestinationBridge implements Service, MessageListener {
this.jmsMessageConvertor = jmsMessageConvertor; this.jmsMessageConvertor = jmsMessageConvertor;
} }
public int getMaximumRetries() {
return maximumRetries;
}
/**
* Sets the maximum number of retries if a send fails before closing the
* bridge
*/
public void setMaximumRetries(int maximumRetries) {
this.maximumRetries = maximumRetries;
}
protected Destination processReplyToDestination(Destination destination) { protected Destination processReplyToDestination(Destination destination) {
return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer()); return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
} }
public void start() throws Exception { public void start() throws Exception {
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
MessageConsumer consumer = createConsumer(); createConsumer();
consumer.setMessageListener(this);
createProducer(); createProducer();
} }
} }
@ -107,37 +92,60 @@ public abstract class DestinationBridge implements Service, MessageListener {
} }
public void onMessage(Message message) { public void onMessage(Message message) {
int attempt = 0; int attempt = 0;
while (started.get() && message != null) { final int maxRetries = jmsConnector.getReconnectionPolicy().getMaxSendRetries();
while (started.get() && message != null && ++attempt <= maxRetries) {
try { try {
if (attempt > 0) { if (attempt > 0) {
restartProducer(); try {
Thread.sleep(jmsConnector.getReconnectionPolicy().getNextDelay(attempt));
} catch(InterruptedException e) {
break;
}
} }
Message converted; Message converted;
if (doHandleReplyTo) { if (jmsMessageConvertor != null) {
Destination replyTo = message.getJMSReplyTo(); if (doHandleReplyTo) {
if (replyTo != null) { Destination replyTo = message.getJMSReplyTo();
converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo)); if (replyTo != null) {
converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
} else {
converted = jmsMessageConvertor.convert(message);
}
} else { } else {
message.setJMSReplyTo(null);
converted = jmsMessageConvertor.convert(message); converted = jmsMessageConvertor.convert(message);
} }
} else { } else {
message.setJMSReplyTo(null); // The Producer side is not up or not yet configured, retry.
converted = jmsMessageConvertor.convert(message); continue;
} }
sendMessage(converted);
message.acknowledge(); try {
sendMessage(converted);
} catch(Exception e) {
jmsConnector.handleConnectionFailure(getConnectionForProducer());
continue;
}
try {
message.acknowledge();
} catch(Exception e) {
jmsConnector.handleConnectionFailure(getConnnectionForConsumer());
continue;
}
// if we got here then it made it out and was ack'd
return; return;
} catch (Exception e) { } catch (Exception e) {
LOG.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e); LOG.info("failed to forward message on attempt: " + attempt +
if (maximumRetries > 0 && attempt >= maximumRetries) { " reason: " + e + " message: " + message, e);
try {
stop();
} catch (Exception e1) {
LOG.warn("Failed to stop cleanly", e1);
}
}
} }
} }
} }
@ -166,15 +174,4 @@ public abstract class DestinationBridge implements Service, MessageListener {
protected abstract Connection getConnectionForProducer(); protected abstract Connection getConnectionForProducer();
protected void restartProducer() throws JMSException, NamingException {
try {
//don't reconnect immediately
Thread.sleep(1000);
getConnectionForProducer().close();
} catch (Exception e) {
LOG.debug("Ignoring failure to close producer connection: " + e, e);
}
jmsConnector.restartProducerConnection();
createProducer();
}
} }

View File

@ -17,11 +17,12 @@
package org.apache.activemq.network.jms; package org.apache.activemq.network.jms;
/** /**
* Create an Inbound Queue Bridge * Create an Inbound Queue Bridge. By default this class uses the sname name for
* both the inbound and outbound queue. This behavior can be overridden however
* by using the setter methods to configure both the inbound and outboud queue names
* separately.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*
*
*/ */
public class InboundQueueBridge extends QueueBridge { public class InboundQueueBridge extends QueueBridge {
@ -29,7 +30,7 @@ public class InboundQueueBridge extends QueueBridge {
String localQueueName; String localQueueName;
/** /**
* Constructor that takes a foriegn destination as an argument * Constructor that takes a foreign destination as an argument
* *
* @param inboundQueueName * @param inboundQueueName
*/ */
@ -39,7 +40,7 @@ public class InboundQueueBridge extends QueueBridge {
} }
/** /**
* Default Contructor * Default Constructor
*/ */
public InboundQueueBridge() { public InboundQueueBridge() {
} }
@ -52,6 +53,10 @@ public class InboundQueueBridge extends QueueBridge {
} }
/** /**
* Sets the queue name used for the inbound queue, if the outbound queue
* name has not been set, then this method uses the same name to configure
* the outbound queue name.
*
* @param inboundQueueName The inboundQueueName to set. * @param inboundQueueName The inboundQueueName to set.
*/ */
public void setInboundQueueName(String inboundQueueName) { public void setInboundQueueName(String inboundQueueName) {
@ -74,5 +79,4 @@ public class InboundQueueBridge extends QueueBridge {
public void setLocalQueueName(String localQueueName) { public void setLocalQueueName(String localQueueName) {
this.localQueueName = localQueueName; this.localQueueName = localQueueName;
} }
} }

View File

@ -17,11 +17,12 @@
package org.apache.activemq.network.jms; package org.apache.activemq.network.jms;
/** /**
* Create an Inbound Topic Bridge * Create an Inbound Topic Bridge. By default this class uses the topic name for
* both the inbound and outbound topic. This behavior can be overridden however
* by using the setter methods to configure both the inbound and outboud topic names
* separately.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*
*
*/ */
public class InboundTopicBridge extends TopicBridge { public class InboundTopicBridge extends TopicBridge {
@ -29,7 +30,7 @@ public class InboundTopicBridge extends TopicBridge {
String localTopicName; String localTopicName;
/** /**
* Constructor that takes a foriegn destination as an argument * Constructor that takes a foreign destination as an argument
* *
* @param inboundTopicName * @param inboundTopicName
*/ */
@ -39,7 +40,7 @@ public class InboundTopicBridge extends TopicBridge {
} }
/** /**
* Default Contructor * Default Constructor
*/ */
public InboundTopicBridge() { public InboundTopicBridge() {
} }
@ -52,6 +53,10 @@ public class InboundTopicBridge extends TopicBridge {
} }
/** /**
* Sets the topic name used for the inbound topic, if the outbound topic
* name has not been set, then this method uses the same name to configure
* the outbound topic name.
*
* @param inboundTopicName * @param inboundTopicName
*/ */
public void setInboundTopicName(String inboundTopicName) { public void setInboundTopicName(String inboundTopicName) {
@ -74,5 +79,4 @@ public class InboundTopicBridge extends TopicBridge {
public void setLocalTopicName(String localTopicName) { public void setLocalTopicName(String localTopicName) {
this.localTopicName = localTopicName; this.localTopicName = localTopicName;
} }
} }

View File

@ -20,12 +20,16 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.QueueConnection;
import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service; import org.apache.activemq.Service;
@ -37,10 +41,8 @@ import org.springframework.jndi.JndiTemplate;
/** /**
* This bridge joins the gap between foreign JMS providers and ActiveMQ As some * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
* JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself
* JMS 1.0.2 compliant. * aimed to be in compliance with the JMS 1.0.2 specification.
*
*
*/ */
public abstract class JmsConnector implements Service { public abstract class JmsConnector implements Service {
@ -52,7 +54,12 @@ public abstract class JmsConnector implements Service {
protected JmsMesageConvertor inboundMessageConvertor; protected JmsMesageConvertor inboundMessageConvertor;
protected JmsMesageConvertor outboundMessageConvertor; protected JmsMesageConvertor outboundMessageConvertor;
protected AtomicBoolean initialized = new AtomicBoolean(false); protected AtomicBoolean initialized = new AtomicBoolean(false);
protected AtomicBoolean localSideInitialized = new AtomicBoolean(false);
protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false);
protected AtomicBoolean started = new AtomicBoolean(false); protected AtomicBoolean started = new AtomicBoolean(false);
protected AtomicBoolean failed = new AtomicBoolean();
protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>();
protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>();
protected ActiveMQConnectionFactory embeddedConnectionFactory; protected ActiveMQConnectionFactory embeddedConnectionFactory;
protected int replyToDestinationCacheSize = 10000; protected int replyToDestinationCacheSize = 10000;
protected String outboundUsername; protected String outboundUsername;
@ -61,21 +68,22 @@ public abstract class JmsConnector implements Service {
protected String localPassword; protected String localPassword;
protected String outboundClientId; protected String outboundClientId;
protected String localClientId; protected String localClientId;
protected LRUCache replyToBridges = createLRUCache(); protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache();
private ReconnectionPolicy policy = new ReconnectionPolicy();
protected ThreadPoolExecutor connectionSerivce;
private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
private String name; private String name;
private static LRUCache<Destination, DestinationBridge> createLRUCache() {
private static LRUCache createLRUCache() { return new LRUCache<Destination, DestinationBridge>() {
return new LRUCache() {
private static final long serialVersionUID = -7446792754185879286L; private static final long serialVersionUID = -7446792754185879286L;
protected boolean removeEldestEntry(Map.Entry enty) { protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) {
if (size() > maxCacheSize) { if (size() > maxCacheSize) {
Iterator iter = entrySet().iterator(); Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator();
Map.Entry lru = (Map.Entry)iter.next(); Map.Entry<Destination, DestinationBridge> lru = iter.next();
remove(lru.getKey()); remove(lru.getKey());
DestinationBridge bridge = (DestinationBridge)lru.getValue(); DestinationBridge bridge = (DestinationBridge)lru.getValue();
try { try {
@ -90,8 +98,6 @@ public abstract class JmsConnector implements Service {
}; };
} }
/**
*/
public boolean init() { public boolean init() {
boolean result = initialized.compareAndSet(false, true); boolean result = initialized.compareAndSet(false, true);
if (result) { if (result) {
@ -108,19 +114,49 @@ public abstract class JmsConnector implements Service {
outboundMessageConvertor = new SimpleJmsMessageConvertor(); outboundMessageConvertor = new SimpleJmsMessageConvertor();
} }
replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize()); replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
connectionSerivce = createExecutor();
// Subclasses can override this to customize their own it.
result = doConnectorInit();
} }
return result; return result;
} }
protected boolean doConnectorInit() {
// We try to make a connection via a sync call first so that the
// JmsConnector is fully initialized before the start call returns
// in order to avoid missing any messages that are dispatched
// immediately after startup. If either side fails we queue an
// asynchronous task to manage the reconnect attempts.
try {
initializeLocalConnection();
localSideInitialized.set(true);
} catch(Exception e) {
// Queue up the task to attempt the local connection.
scheduleAsyncLocalConnectionReconnect();
}
try {
initializeForeignConnection();
foreignSideInitialized.set(true);
} catch(Exception e) {
// Queue up the task for the foreign connection now.
scheduleAsyncForeignConnectionReconnect();
}
return true;
}
public void start() throws Exception { public void start() throws Exception {
init();
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
for (int i = 0; i < inboundBridges.size(); i++) { init();
DestinationBridge bridge = inboundBridges.get(i); for (DestinationBridge bridge : inboundBridges) {
bridge.start(); bridge.start();
} }
for (int i = 0; i < outboundBridges.size(); i++) { for (DestinationBridge bridge : outboundBridges) {
DestinationBridge bridge = outboundBridges.get(i);
bridge.start(); bridge.start();
} }
LOG.info("JMS Connector " + getName() + " Started"); LOG.info("JMS Connector " + getName() + " Started");
@ -129,12 +165,13 @@ public abstract class JmsConnector implements Service {
public void stop() throws Exception { public void stop() throws Exception {
if (started.compareAndSet(true, false)) { if (started.compareAndSet(true, false)) {
for (int i = 0; i < inboundBridges.size(); i++) {
DestinationBridge bridge = inboundBridges.get(i); this.connectionSerivce.shutdown();
for (DestinationBridge bridge : inboundBridges) {
bridge.stop(); bridge.stop();
} }
for (int i = 0; i < outboundBridges.size(); i++) { for (DestinationBridge bridge : outboundBridges) {
DestinationBridge bridge = outboundBridges.get(i);
bridge.stop(); bridge.stop();
} }
LOG.info("JMS Connector " + getName() + " Stopped"); LOG.info("JMS Connector " + getName() + " Stopped");
@ -144,6 +181,7 @@ public abstract class JmsConnector implements Service {
public void clearBridges() { public void clearBridges() {
inboundBridges.clear(); inboundBridges.clear();
outboundBridges.clear(); outboundBridges.clear();
replyToBridges.clear();
} }
protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection); protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
@ -158,6 +196,14 @@ public abstract class JmsConnector implements Service {
embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI()); embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
} }
public Connection getLocalConnection() {
return this.localConnection.get();
}
public Connection getForeignConnection() {
return this.foreignConnection.get();
}
/** /**
* @return Returns the jndiTemplate. * @return Returns the jndiTemplate.
*/ */
@ -222,8 +268,7 @@ public abstract class JmsConnector implements Service {
} }
/** /**
* @param replyToDestinationCacheSize The replyToDestinationCacheSize to * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set.
* set.
*/ */
public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) { public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
this.replyToDestinationCacheSize = replyToDestinationCacheSize; this.replyToDestinationCacheSize = replyToDestinationCacheSize;
@ -313,13 +358,37 @@ public abstract class JmsConnector implements Service {
this.localClientId = localClientId; this.localClientId = localClientId;
} }
/**
* @return the currently configured reconnection policy.
*/
public ReconnectionPolicy getReconnectionPolicy() {
return this.policy;
}
/**
* @param policy The new reconnection policy this {@link JmsConnector} should use.
*/
public void setReconnectionPolicy(ReconnectionPolicy policy) {
this.policy = policy;
}
/**
* @return returns true if the {@link JmsConnector} is connected to both brokers.
*/
public boolean isConnected() {
return localConnection.get() != null && foreignConnection.get() != null;
}
protected void addInboundBridge(DestinationBridge bridge) { protected void addInboundBridge(DestinationBridge bridge) {
inboundBridges.add(bridge); if (!inboundBridges.contains(bridge)) {
inboundBridges.add(bridge);
}
} }
protected void addOutboundBridge(DestinationBridge bridge) { protected void addOutboundBridge(DestinationBridge bridge) {
outboundBridges.add(bridge); if (!outboundBridges.contains(bridge)) {
outboundBridges.add(bridge);
}
} }
protected void removeInboundBridge(DestinationBridge bridge) { protected void removeInboundBridge(DestinationBridge bridge) {
@ -337,13 +406,205 @@ public abstract class JmsConnector implements Service {
return name; return name;
} }
private static synchronized int getNextId() {
return nextId++;
}
public void setName(String name) { public void setName(String name) {
this.name = name; this.name = name;
} }
public abstract void restartProducerConnection() throws NamingException, JMSException; private static synchronized int getNextId() {
return nextId++;
}
public boolean isFailed() {
return this.failed.get();
}
/**
* Performs the work of connection to the local side of the Connection.
* <p>
* This creates the initial connection to the local end of the {@link JmsConnector}
* and then sets up all the destination bridges with the information needed to bridge
* on the local side of the connection.
*
* @throws Exception if the connection cannot be established for any reason.
*/
protected abstract void initializeLocalConnection() throws Exception;
/**
* Performs the work of connection to the foreign side of the Connection.
* <p>
* This creates the initial connection to the foreign end of the {@link JmsConnector}
* and then sets up all the destination bridges with the information needed to bridge
* on the foreign side of the connection.
*
* @throws Exception if the connection cannot be established for any reason.
*/
protected abstract void initializeForeignConnection() throws Exception;
/**
* Callback method that the Destination bridges can use to report an exception to occurs
* during normal bridging operations.
*
* @param connection
* The connection that was in use when the failure occured.
*/
void handleConnectionFailure(Connection connection) {
// Can happen if async exception listener kicks in at the same time.
if (connection == null || !this.started.get()) {
return;
}
LOG.info("JmsConnector handling loss of connection [" + connection.toString() + "]");
// TODO - How do we handle the re-wiring of replyToBridges in this case.
replyToBridges.clear();
if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) {
// Stop the inbound bridges when the foreign connection is dropped since
// the bridge has no consumer and needs to be restarted once a new connection
// to the foreign side is made.
for (DestinationBridge bridge : inboundBridges) {
try {
bridge.stop();
} catch(Exception e) {
}
}
// We got here first and cleared the connection, now we queue a reconnect.
this.connectionSerivce.execute(new Runnable() {
@Override
public void run() {
try {
doInitializeConnection(false);
} catch (Exception e) {
LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
}
}
});
} else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) {
// Stop the outbound bridges when the local connection is dropped since
// the bridge has no consumer and needs to be restarted once a new connection
// to the local side is made.
for (DestinationBridge bridge : outboundBridges) {
try {
bridge.stop();
} catch(Exception e) {
}
}
// We got here first and cleared the connection, now we queue a reconnect.
this.connectionSerivce.execute(new Runnable() {
@Override
public void run() {
try {
doInitializeConnection(true);
} catch (Exception e) {
LOG.error("Failed to initialize local connection for the JMSConnector", e);
}
}
});
}
}
private void scheduleAsyncLocalConnectionReconnect() {
this.connectionSerivce.execute(new Runnable() {
@Override
public void run() {
try {
doInitializeConnection(true);
} catch (Exception e) {
LOG.error("Failed to initialize local connection for the JMSConnector", e);
}
}
});
}
private void scheduleAsyncForeignConnectionReconnect() {
this.connectionSerivce.execute(new Runnable() {
@Override
public void run() {
try {
doInitializeConnection(false);
} catch (Exception e) {
LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
}
}
});
}
private void doInitializeConnection(boolean local) throws Exception {
int attempt = 0;
final int maxRetries;
if (local) {
maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
policy.getMaxReconnectAttempts();
} else {
maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
policy.getMaxReconnectAttempts();
}
do
{
if (attempt > 0) {
try {
Thread.sleep(policy.getNextDelay(attempt));
} catch(InterruptedException e) {
}
}
if (connectionSerivce.isTerminating()) {
return;
}
try {
if (local) {
initializeLocalConnection();
localSideInitialized.set(true);
} else {
initializeForeignConnection();
foreignSideInitialized.set(true);
}
// Once we are connected we ensure all the bridges are started.
if (localConnection.get() != null && foreignConnection.get() != null) {
for (DestinationBridge bridge : inboundBridges) {
bridge.start();
}
for (DestinationBridge bridge : outboundBridges) {
bridge.start();
}
}
return;
} catch(Exception e) {
LOG.debug("Failed to establish initial " + (local ? "local" : "foriegn") +
" connection for JmsConnector [" + attempt + "]: " + e.getMessage());
}
}
while (maxRetries < ++attempt && !connectionSerivce.isTerminating());
this.failed.set(true);
}
private ThreadFactory factory = new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: ");
thread.setDaemon(true);
return thread;
}
};
private ThreadPoolExecutor createExecutor() {
ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
exec.allowCoreThreadTimeOut(true);
return exec;
}
} }

View File

@ -23,19 +23,33 @@ import javax.jms.Message;
/** /**
* Converts Message from one JMS to another * Converts Message from one JMS to another
*
*
*/ */
public interface JmsMesageConvertor { public interface JmsMesageConvertor {
/** /**
* Convert a foreign JMS Message to a native ActiveMQ Message * Convert a foreign JMS Message to a native ActiveMQ Message
*
* @param message * @param message
* The target message to convert to a native ActiveMQ message
*
* @return the converted message * @return the converted message
* @throws JMSException * @throws JMSException
*/ */
Message convert(Message message) throws JMSException; Message convert(Message message) throws JMSException;
/**
* Convert a foreign JMS Message to a native ActiveMQ Message - Inbound or
* visa-versa outbound. If the replyTo Destination instance is not null
* then the Message is configured with the given replyTo value.
*
* @param message
* The target message to convert to a native ActiveMQ message
* @param replyTo
* The replyTo Destination to set on the converted Message.
*
* @return the converted message
* @throws JMSException
*/
Message convert(Message message, Destination replyTo) throws JMSException; Message convert(Message message, Destination replyTo) throws JMSException;
void setConnection(Connection connection); void setConnection(Connection connection);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.network.jms;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.QueueConnection; import javax.jms.QueueConnection;
@ -33,8 +34,6 @@ import org.slf4j.LoggerFactory;
* A Bridge to other JMS Queue providers * A Bridge to other JMS Queue providers
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*
*
*/ */
public class JmsQueueConnector extends JmsConnector { public class JmsQueueConnector extends JmsConnector {
private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class); private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class);
@ -42,28 +41,9 @@ public class JmsQueueConnector extends JmsConnector {
private String localConnectionFactoryName; private String localConnectionFactoryName;
private QueueConnectionFactory outboundQueueConnectionFactory; private QueueConnectionFactory outboundQueueConnectionFactory;
private QueueConnectionFactory localQueueConnectionFactory; private QueueConnectionFactory localQueueConnectionFactory;
private QueueConnection outboundQueueConnection;
private QueueConnection localQueueConnection;
private InboundQueueBridge[] inboundQueueBridges; private InboundQueueBridge[] inboundQueueBridges;
private OutboundQueueBridge[] outboundQueueBridges; private OutboundQueueBridge[] outboundQueueBridges;
public boolean init() {
boolean result = super.init();
if (result) {
try {
initializeForeignQueueConnection();
initializeLocalQueueConnection();
initializeInboundJmsMessageConvertor();
initializeOutboundJmsMessageConvertor();
initializeInboundQueueBridges();
initializeOutboundQueueBridges();
} catch (Exception e) {
LOG.error("Failed to initialize the JMSConnector", e);
}
}
return result;
}
/** /**
* @return Returns the inboundQueueBridges. * @return Returns the inboundQueueBridges.
*/ */
@ -147,28 +127,28 @@ public class JmsQueueConnector extends JmsConnector {
* @return Returns the localQueueConnection. * @return Returns the localQueueConnection.
*/ */
public QueueConnection getLocalQueueConnection() { public QueueConnection getLocalQueueConnection() {
return localQueueConnection; return (QueueConnection) localConnection.get();
} }
/** /**
* @param localQueueConnection The localQueueConnection to set. * @param localQueueConnection The localQueueConnection to set.
*/ */
public void setLocalQueueConnection(QueueConnection localQueueConnection) { public void setLocalQueueConnection(QueueConnection localQueueConnection) {
this.localQueueConnection = localQueueConnection; this.localConnection.set(localQueueConnection);
} }
/** /**
* @return Returns the outboundQueueConnection. * @return Returns the outboundQueueConnection.
*/ */
public QueueConnection getOutboundQueueConnection() { public QueueConnection getOutboundQueueConnection() {
return outboundQueueConnection; return (QueueConnection) foreignConnection.get();
} }
/** /**
* @param outboundQueueConnection The outboundQueueConnection to set. * @param outboundQueueConnection The outboundQueueConnection to set.
*/ */
public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) { public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
this.outboundQueueConnection = foreignQueueConnection; this.foreignConnection.set(foreignQueueConnection);
} }
/** /**
@ -179,27 +159,12 @@ public class JmsQueueConnector extends JmsConnector {
this.outboundQueueConnectionFactory = foreignQueueConnectionFactory; this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
} }
public void restartProducerConnection() throws NamingException, JMSException { @Override
outboundQueueConnection = null; protected void initializeForeignConnection() throws NamingException, JMSException {
initializeForeignQueueConnection();
// the outboundQueueConnection was reestablished - publish the new connection to the bridges final QueueConnection newConnection;
if (inboundQueueBridges != null) {
for (int i = 0; i < inboundQueueBridges.length; i++) {
InboundQueueBridge bridge = inboundQueueBridges[i];
bridge.setConsumerConnection(outboundQueueConnection);
}
}
if (outboundQueueBridges != null) {
for (int i = 0; i < outboundQueueBridges.length; i++) {
OutboundQueueBridge bridge = outboundQueueBridges[i];
bridge.setProducerConnection(outboundQueueConnection);
}
}
}
protected void initializeForeignQueueConnection() throws NamingException, JMSException { if (foreignConnection.get() == null) {
if (outboundQueueConnection == null) {
// get the connection factories // get the connection factories
if (outboundQueueConnectionFactory == null) { if (outboundQueueConnectionFactory == null) {
// look it up from JNDI // look it up from JNDI
@ -207,31 +172,57 @@ public class JmsQueueConnector extends JmsConnector {
outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
.lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class); .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
if (outboundUsername != null) { if (outboundUsername != null) {
outboundQueueConnection = outboundQueueConnectionFactory newConnection = outboundQueueConnectionFactory
.createQueueConnection(outboundUsername, outboundPassword); .createQueueConnection(outboundUsername, outboundPassword);
} else { } else {
outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection(); newConnection = outboundQueueConnectionFactory.createQueueConnection();
} }
} else { } else {
throw new JMSException("Cannot create foreignConnection - no information"); throw new JMSException("Cannot create foreignConnection - no information");
} }
} else { } else {
if (outboundUsername != null) { if (outboundUsername != null) {
outboundQueueConnection = outboundQueueConnectionFactory newConnection = outboundQueueConnectionFactory
.createQueueConnection(outboundUsername, outboundPassword); .createQueueConnection(outboundUsername, outboundPassword);
} else { } else {
outboundQueueConnection = outboundQueueConnectionFactory.createQueueConnection(); newConnection = outboundQueueConnectionFactory.createQueueConnection();
} }
} }
} else {
// Clear if for now in case something goes wrong during the init.
newConnection = (QueueConnection) foreignConnection.getAndSet(null);
} }
if (localClientId != null && localClientId.length() > 0) {
outboundQueueConnection.setClientID(getOutboundClientId()); if (outboundClientId != null && outboundClientId.length() > 0) {
newConnection.setClientID(getOutboundClientId());
} }
outboundQueueConnection.start(); newConnection.start();
outboundMessageConvertor.setConnection(newConnection);
// Configure the bridges with the new Outbound connection.
initializeInboundDestinationBridgesOutboundSide(newConnection);
initializeOutboundDestinationBridgesOutboundSide(newConnection);
// Register for any async error notifications now so we can reset in the
// case where there's not a lot of activity and a connection drops.
newConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
handleConnectionFailure(newConnection);
}
});
// At this point all looks good, so this our current connection now.
foreignConnection.set(newConnection);
} }
protected void initializeLocalQueueConnection() throws NamingException, JMSException { @Override
if (localQueueConnection == null) { protected void initializeLocalConnection() throws NamingException, JMSException {
final QueueConnection newConnection;
if (localConnection.get() == null) {
// get the connection factories // get the connection factories
if (localQueueConnectionFactory == null) { if (localQueueConnectionFactory == null) {
if (embeddedConnectionFactory == null) { if (embeddedConnectionFactory == null) {
@ -240,83 +231,100 @@ public class JmsQueueConnector extends JmsConnector {
localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
.lookup(localConnectionFactoryName, QueueConnectionFactory.class); .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
if (localUsername != null) { if (localUsername != null) {
localQueueConnection = localQueueConnectionFactory newConnection = localQueueConnectionFactory
.createQueueConnection(localUsername, localPassword); .createQueueConnection(localUsername, localPassword);
} else { } else {
localQueueConnection = localQueueConnectionFactory.createQueueConnection(); newConnection = localQueueConnectionFactory.createQueueConnection();
} }
} else { } else {
throw new JMSException("Cannot create localConnection - no information"); throw new JMSException("Cannot create localConnection - no information");
} }
} else { } else {
localQueueConnection = embeddedConnectionFactory.createQueueConnection(); newConnection = embeddedConnectionFactory.createQueueConnection();
} }
} else { } else {
if (localUsername != null) { if (localUsername != null) {
localQueueConnection = localQueueConnectionFactory.createQueueConnection(localUsername, newConnection = localQueueConnectionFactory.
localPassword); createQueueConnection(localUsername, localPassword);
} else { } else {
localQueueConnection = localQueueConnectionFactory.createQueueConnection(); newConnection = localQueueConnectionFactory.createQueueConnection();
} }
} }
} else {
// Clear if for now in case something goes wrong during the init.
newConnection = (QueueConnection) localConnection.getAndSet(null);
} }
if (localClientId != null && localClientId.length() > 0) { if (localClientId != null && localClientId.length() > 0) {
localQueueConnection.setClientID(getLocalClientId()); newConnection.setClientID(getLocalClientId());
} }
localQueueConnection.start(); newConnection.start();
inboundMessageConvertor.setConnection(newConnection);
// Configure the bridges with the new Local connection.
initializeInboundDestinationBridgesLocalSide(newConnection);
initializeOutboundDestinationBridgesLocalSide(newConnection);
// Register for any async error notifications now so we can reset in the
// case where there's not a lot of activity and a connection drops.
newConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
handleConnectionFailure(newConnection);
}
});
// At this point all looks good, so this our current connection now.
localConnection.set(newConnection);
} }
protected void initializeInboundJmsMessageConvertor() { protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
inboundMessageConvertor.setConnection(localQueueConnection);
}
protected void initializeOutboundJmsMessageConvertor() {
outboundMessageConvertor.setConnection(outboundQueueConnection);
}
protected void initializeInboundQueueBridges() throws JMSException {
if (inboundQueueBridges != null) { if (inboundQueueBridges != null) {
QueueSession outboundSession = outboundQueueConnection QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSession localSession = localQueueConnection.createQueueSession(false, for (InboundQueueBridge bridge : inboundQueueBridges) {
Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < inboundQueueBridges.length; i++) {
InboundQueueBridge bridge = inboundQueueBridges[i];
String localQueueName = bridge.getLocalQueueName();
Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
String queueName = bridge.getInboundQueueName(); String queueName = bridge.getInboundQueueName();
Queue foreignQueue = createForeignQueue(outboundSession, queueName); Queue foreignQueue = createForeignQueue(outboundSession, queueName);
bridge.setConsumer(null);
bridge.setConsumerQueue(foreignQueue); bridge.setConsumerQueue(foreignQueue);
bridge.setConsumerConnection(connection);
bridge.setJmsConnector(this);
addInboundBridge(bridge);
}
outboundSession.close();
}
}
protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
if (inboundQueueBridges != null) {
QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
for (InboundQueueBridge bridge : inboundQueueBridges) {
String localQueueName = bridge.getLocalQueueName();
Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
bridge.setProducerQueue(activemqQueue); bridge.setProducerQueue(activemqQueue);
bridge.setProducerConnection(localQueueConnection); bridge.setProducerConnection(connection);
bridge.setConsumerConnection(outboundQueueConnection);
if (bridge.getJmsMessageConvertor() == null) { if (bridge.getJmsMessageConvertor() == null) {
bridge.setJmsMessageConvertor(getInboundMessageConvertor()); bridge.setJmsMessageConvertor(getInboundMessageConvertor());
} }
bridge.setJmsConnector(this); bridge.setJmsConnector(this);
addInboundBridge(bridge); addInboundBridge(bridge);
} }
outboundSession.close();
localSession.close(); localSession.close();
} }
} }
protected void initializeOutboundQueueBridges() throws JMSException { protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
if (outboundQueueBridges != null) { if (outboundQueueBridges != null) {
QueueSession outboundSession = outboundQueueConnection QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSession localSession = localQueueConnection.createQueueSession(false, for (OutboundQueueBridge bridge : outboundQueueBridges) {
Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < outboundQueueBridges.length; i++) {
OutboundQueueBridge bridge = outboundQueueBridges[i];
String localQueueName = bridge.getLocalQueueName();
Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
String queueName = bridge.getOutboundQueueName(); String queueName = bridge.getOutboundQueueName();
Queue foreignQueue = createForeignQueue(outboundSession, queueName); Queue foreignQueue = createForeignQueue(outboundSession, queueName);
bridge.setConsumerQueue(activemqQueue);
bridge.setProducerQueue(foreignQueue); bridge.setProducerQueue(foreignQueue);
bridge.setProducerConnection(outboundQueueConnection); bridge.setProducerConnection(connection);
bridge.setConsumerConnection(localQueueConnection);
if (bridge.getJmsMessageConvertor() == null) { if (bridge.getJmsMessageConvertor() == null) {
bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
} }
@ -324,6 +332,23 @@ public class JmsQueueConnector extends JmsConnector {
addOutboundBridge(bridge); addOutboundBridge(bridge);
} }
outboundSession.close(); outboundSession.close();
}
}
protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
if (outboundQueueBridges != null) {
QueueSession localSession =
connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
for (OutboundQueueBridge bridge : outboundQueueBridges) {
String localQueueName = bridge.getLocalQueueName();
Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
bridge.setConsumer(null);
bridge.setConsumerQueue(activemqQueue);
bridge.setConsumerConnection(connection);
bridge.setJmsConnector(this);
addOutboundBridge(bridge);
}
localSession.close(); localSession.close();
} }
} }
@ -331,7 +356,7 @@ public class JmsQueueConnector extends JmsConnector {
protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
Connection replyToConsumerConnection) { Connection replyToConsumerConnection) {
Queue replyToProducerQueue = (Queue)destination; Queue replyToProducerQueue = (Queue)destination;
boolean isInbound = replyToProducerConnection.equals(localQueueConnection); boolean isInbound = replyToProducerConnection.equals(localConnection.get());
if (isInbound) { if (isInbound) {
InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue); InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);

View File

@ -18,12 +18,13 @@ package org.apache.activemq.network.jms;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic; import javax.jms.Topic;
import javax.jms.TopicConnection; import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory; import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession; import javax.jms.TopicSession;
import javax.jms.Session;
import javax.naming.NamingException; import javax.naming.NamingException;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -33,8 +34,6 @@ import org.slf4j.LoggerFactory;
* A Bridge to other JMS Topic providers * A Bridge to other JMS Topic providers
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*
*
*/ */
public class JmsTopicConnector extends JmsConnector { public class JmsTopicConnector extends JmsConnector {
private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class); private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class);
@ -42,28 +41,9 @@ public class JmsTopicConnector extends JmsConnector {
private String localConnectionFactoryName; private String localConnectionFactoryName;
private TopicConnectionFactory outboundTopicConnectionFactory; private TopicConnectionFactory outboundTopicConnectionFactory;
private TopicConnectionFactory localTopicConnectionFactory; private TopicConnectionFactory localTopicConnectionFactory;
private TopicConnection outboundTopicConnection;
private TopicConnection localTopicConnection;
private InboundTopicBridge[] inboundTopicBridges; private InboundTopicBridge[] inboundTopicBridges;
private OutboundTopicBridge[] outboundTopicBridges; private OutboundTopicBridge[] outboundTopicBridges;
public boolean init() {
boolean result = super.init();
if (result) {
try {
initializeForeignTopicConnection();
initializeLocalTopicConnection();
initializeInboundJmsMessageConvertor();
initializeOutboundJmsMessageConvertor();
initializeInboundTopicBridges();
initializeOutboundTopicBridges();
} catch (Exception e) {
LOG.error("Failed to initialize the JMSConnector", e);
}
}
return result;
}
/** /**
* @return Returns the inboundTopicBridges. * @return Returns the inboundTopicBridges.
*/ */
@ -100,8 +80,7 @@ public class JmsTopicConnector extends JmsConnector {
} }
/** /**
* @param localTopicConnectionFactory The localTopicConnectionFactory to * @param localTopicConnectionFactory The localTopicConnectionFactory to set.
* set.
*/ */
public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) { public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) {
this.localTopicConnectionFactory = localConnectionFactory; this.localTopicConnectionFactory = localConnectionFactory;
@ -122,8 +101,7 @@ public class JmsTopicConnector extends JmsConnector {
} }
/** /**
* @param outboundTopicConnectionFactoryName The * @param outboundTopicConnectionFactoryName The outboundTopicConnectionFactoryName to set.
* outboundTopicConnectionFactoryName to set.
*/ */
public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) { public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName; this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
@ -147,45 +125,43 @@ public class JmsTopicConnector extends JmsConnector {
* @return Returns the localTopicConnection. * @return Returns the localTopicConnection.
*/ */
public TopicConnection getLocalTopicConnection() { public TopicConnection getLocalTopicConnection() {
return localTopicConnection; return (TopicConnection) localConnection.get();
} }
/** /**
* @param localTopicConnection The localTopicConnection to set. * @param localTopicConnection The localTopicConnection to set.
*/ */
public void setLocalTopicConnection(TopicConnection localTopicConnection) { public void setLocalTopicConnection(TopicConnection localTopicConnection) {
this.localTopicConnection = localTopicConnection; this.localConnection.set(localTopicConnection);
} }
/** /**
* @return Returns the outboundTopicConnection. * @return Returns the outboundTopicConnection.
*/ */
public TopicConnection getOutboundTopicConnection() { public TopicConnection getOutboundTopicConnection() {
return outboundTopicConnection; return (TopicConnection) foreignConnection.get();
} }
/** /**
* @param outboundTopicConnection The outboundTopicConnection to set. * @param outboundTopicConnection The outboundTopicConnection to set.
*/ */
public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) { public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
this.outboundTopicConnection = foreignTopicConnection; this.foreignConnection.set(foreignTopicConnection);
} }
/** /**
* @param outboundTopicConnectionFactory The outboundTopicConnectionFactory * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory to set.
* to set.
*/ */
public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) { public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
this.outboundTopicConnectionFactory = foreignTopicConnectionFactory; this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
} }
public void restartProducerConnection() throws NamingException, JMSException { @Override
outboundTopicConnection = null; protected void initializeForeignConnection() throws NamingException, JMSException {
initializeForeignTopicConnection();
}
protected void initializeForeignTopicConnection() throws NamingException, JMSException { final TopicConnection newConnection;
if (outboundTopicConnection == null) {
if (foreignConnection.get() == null) {
// get the connection factories // get the connection factories
if (outboundTopicConnectionFactory == null) { if (outboundTopicConnectionFactory == null) {
// look it up from JNDI // look it up from JNDI
@ -193,31 +169,57 @@ public class JmsTopicConnector extends JmsConnector {
outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate
.lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class); .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
if (outboundUsername != null) { if (outboundUsername != null) {
outboundTopicConnection = outboundTopicConnectionFactory newConnection = outboundTopicConnectionFactory
.createTopicConnection(outboundUsername, outboundPassword); .createTopicConnection(outboundUsername, outboundPassword);
} else { } else {
outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection(); newConnection = outboundTopicConnectionFactory.createTopicConnection();
} }
} else { } else {
throw new JMSException("Cannot create localConnection - no information"); throw new JMSException("Cannot create foreignConnection - no information");
} }
} else { } else {
if (outboundUsername != null) { if (outboundUsername != null) {
outboundTopicConnection = outboundTopicConnectionFactory newConnection = outboundTopicConnectionFactory
.createTopicConnection(outboundUsername, outboundPassword); .createTopicConnection(outboundUsername, outboundPassword);
} else { } else {
outboundTopicConnection = outboundTopicConnectionFactory.createTopicConnection(); newConnection = outboundTopicConnectionFactory.createTopicConnection();
} }
} }
} else {
// Clear if for now in case something goes wrong during the init.
newConnection = (TopicConnection) foreignConnection.getAndSet(null);
} }
if (localClientId != null && localClientId.length() > 0) {
outboundTopicConnection.setClientID(getOutboundClientId()); if (outboundClientId != null && outboundClientId.length() > 0) {
newConnection.setClientID(getOutboundClientId());
} }
outboundTopicConnection.start(); newConnection.start();
outboundMessageConvertor.setConnection(newConnection);
// Configure the bridges with the new Outbound connection.
initializeInboundDestinationBridgesOutboundSide(newConnection);
initializeOutboundDestinationBridgesOutboundSide(newConnection);
// Register for any async error notifications now so we can reset in the
// case where there's not a lot of activity and a connection drops.
newConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
handleConnectionFailure(newConnection);
}
});
// At this point all looks good, so this our current connection now.
foreignConnection.set(newConnection);
} }
protected void initializeLocalTopicConnection() throws NamingException, JMSException { @Override
if (localTopicConnection == null) { protected void initializeLocalConnection() throws NamingException, JMSException {
final TopicConnection newConnection;
if (localConnection.get() == null) {
// get the connection factories // get the connection factories
if (localTopicConnectionFactory == null) { if (localTopicConnectionFactory == null) {
if (embeddedConnectionFactory == null) { if (embeddedConnectionFactory == null) {
@ -226,83 +228,100 @@ public class JmsTopicConnector extends JmsConnector {
localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate
.lookup(localConnectionFactoryName, TopicConnectionFactory.class); .lookup(localConnectionFactoryName, TopicConnectionFactory.class);
if (localUsername != null) { if (localUsername != null) {
localTopicConnection = localTopicConnectionFactory newConnection = localTopicConnectionFactory
.createTopicConnection(localUsername, localPassword); .createTopicConnection(localUsername, localPassword);
} else { } else {
localTopicConnection = localTopicConnectionFactory.createTopicConnection(); newConnection = localTopicConnectionFactory.createTopicConnection();
} }
} else { } else {
throw new JMSException("Cannot create localConnection - no information"); throw new JMSException("Cannot create localConnection - no information");
} }
} else { } else {
localTopicConnection = embeddedConnectionFactory.createTopicConnection(); newConnection = embeddedConnectionFactory.createTopicConnection();
} }
} else { } else {
if (localUsername != null) { if (localUsername != null) {
localTopicConnection = localTopicConnectionFactory.createTopicConnection(localUsername, newConnection = localTopicConnectionFactory.
localPassword); createTopicConnection(localUsername, localPassword);
} else { } else {
localTopicConnection = localTopicConnectionFactory.createTopicConnection(); newConnection = localTopicConnectionFactory.createTopicConnection();
} }
} }
} else {
// Clear if for now in case something goes wrong during the init.
newConnection = (TopicConnection) localConnection.getAndSet(null);
} }
if (localClientId != null && localClientId.length() > 0) { if (localClientId != null && localClientId.length() > 0) {
localTopicConnection.setClientID(getLocalClientId()); newConnection.setClientID(getLocalClientId());
} }
localTopicConnection.start(); newConnection.start();
inboundMessageConvertor.setConnection(newConnection);
// Configure the bridges with the new Local connection.
initializeInboundDestinationBridgesLocalSide(newConnection);
initializeOutboundDestinationBridgesLocalSide(newConnection);
// Register for any async error notifications now so we can reset in the
// case where there's not a lot of activity and a connection drops.
newConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
handleConnectionFailure(newConnection);
}
});
// At this point all looks good, so this our current connection now.
localConnection.set(newConnection);
} }
protected void initializeInboundJmsMessageConvertor() { protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
inboundMessageConvertor.setConnection(localTopicConnection);
}
protected void initializeOutboundJmsMessageConvertor() {
outboundMessageConvertor.setConnection(outboundTopicConnection);
}
protected void initializeInboundTopicBridges() throws JMSException {
if (inboundTopicBridges != null) { if (inboundTopicBridges != null) {
TopicSession outboundSession = outboundTopicConnection TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSession localSession = localTopicConnection.createTopicSession(false, for (InboundTopicBridge bridge : inboundTopicBridges) {
Session.AUTO_ACKNOWLEDGE); String TopicName = bridge.getInboundTopicName();
for (int i = 0; i < inboundTopicBridges.length; i++) { Topic foreignTopic = createForeignTopic(outboundSession, TopicName);
InboundTopicBridge bridge = inboundTopicBridges[i]; bridge.setConsumer(null);
bridge.setConsumerTopic(foreignTopic);
bridge.setConsumerConnection(connection);
bridge.setJmsConnector(this);
addInboundBridge(bridge);
}
outboundSession.close();
}
}
protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
if (inboundTopicBridges != null) {
TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
for (InboundTopicBridge bridge : inboundTopicBridges) {
String localTopicName = bridge.getLocalTopicName(); String localTopicName = bridge.getLocalTopicName();
Topic activemqTopic = createActiveMQTopic(localSession, localTopicName); Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
String topicName = bridge.getInboundTopicName();
Topic foreignTopic = createForeignTopic(outboundSession, topicName);
bridge.setConsumerTopic(foreignTopic);
bridge.setProducerTopic(activemqTopic); bridge.setProducerTopic(activemqTopic);
bridge.setProducerConnection(localTopicConnection); bridge.setProducerConnection(connection);
bridge.setConsumerConnection(outboundTopicConnection);
if (bridge.getJmsMessageConvertor() == null) { if (bridge.getJmsMessageConvertor() == null) {
bridge.setJmsMessageConvertor(getInboundMessageConvertor()); bridge.setJmsMessageConvertor(getInboundMessageConvertor());
} }
bridge.setJmsConnector(this); bridge.setJmsConnector(this);
addInboundBridge(bridge); addInboundBridge(bridge);
} }
outboundSession.close();
localSession.close(); localSession.close();
} }
} }
protected void initializeOutboundTopicBridges() throws JMSException { protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
if (outboundTopicBridges != null) { if (outboundTopicBridges != null) {
TopicSession outboundSession = outboundTopicConnection TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSession localSession = localTopicConnection.createTopicSession(false, for (OutboundTopicBridge bridge : outboundTopicBridges) {
Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < outboundTopicBridges.length; i++) {
OutboundTopicBridge bridge = outboundTopicBridges[i];
String localTopicName = bridge.getLocalTopicName();
Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
String topicName = bridge.getOutboundTopicName(); String topicName = bridge.getOutboundTopicName();
Topic foreignTopic = createForeignTopic(outboundSession, topicName); Topic foreignTopic = createForeignTopic(outboundSession, topicName);
bridge.setConsumerTopic(activemqTopic);
bridge.setProducerTopic(foreignTopic); bridge.setProducerTopic(foreignTopic);
bridge.setProducerConnection(outboundTopicConnection); bridge.setProducerConnection(connection);
bridge.setConsumerConnection(localTopicConnection);
if (bridge.getJmsMessageConvertor() == null) { if (bridge.getJmsMessageConvertor() == null) {
bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
} }
@ -310,6 +329,23 @@ public class JmsTopicConnector extends JmsConnector {
addOutboundBridge(bridge); addOutboundBridge(bridge);
} }
outboundSession.close(); outboundSession.close();
}
}
protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
if (outboundTopicBridges != null) {
TopicSession localSession =
connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
for (OutboundTopicBridge bridge : outboundTopicBridges) {
String localTopicName = bridge.getLocalTopicName();
Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
bridge.setConsumer(null);
bridge.setConsumerTopic(activemqTopic);
bridge.setConsumerConnection(connection);
bridge.setJmsConnector(this);
addOutboundBridge(bridge);
}
localSession.close(); localSession.close();
} }
} }
@ -317,7 +353,7 @@ public class JmsTopicConnector extends JmsConnector {
protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
Connection replyToConsumerConnection) { Connection replyToConsumerConnection) {
Topic replyToProducerTopic = (Topic)destination; Topic replyToProducerTopic = (Topic)destination;
boolean isInbound = replyToProducerConnection.equals(localTopicConnection); boolean isInbound = replyToProducerConnection.equals(localConnection.get());
if (isInbound) { if (isInbound) {
InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic); InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);

View File

@ -17,11 +17,12 @@
package org.apache.activemq.network.jms; package org.apache.activemq.network.jms;
/** /**
* Create an Outbound Queue Bridge * Create an Outbound Queue Bridge. By default the bridge uses the same
* name for both the inbound and outbound queues, however this can be altered
* by using the public setter methods to configure both inbound and outbound
* queue names.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*
*
*/ */
public class OutboundQueueBridge extends QueueBridge { public class OutboundQueueBridge extends QueueBridge {
@ -39,7 +40,7 @@ public class OutboundQueueBridge extends QueueBridge {
} }
/** /**
* Default Contructor * Default Constructor
*/ */
public OutboundQueueBridge() { public OutboundQueueBridge() {
} }
@ -52,6 +53,10 @@ public class OutboundQueueBridge extends QueueBridge {
} }
/** /**
* Sets the name of the outbound queue name. If the inbound queue name
* has not been set already then this method uses the provided queue name
* to set the inbound topic name as well.
*
* @param outboundQueueName The outboundQueueName to set. * @param outboundQueueName The outboundQueueName to set.
*/ */
public void setOutboundQueueName(String outboundQueueName) { public void setOutboundQueueName(String outboundQueueName) {
@ -74,5 +79,4 @@ public class OutboundQueueBridge extends QueueBridge {
public void setLocalQueueName(String localQueueName) { public void setLocalQueueName(String localQueueName) {
this.localQueueName = localQueueName; this.localQueueName = localQueueName;
} }
} }

View File

@ -17,11 +17,12 @@
package org.apache.activemq.network.jms; package org.apache.activemq.network.jms;
/** /**
* Create an Outbound Topic Bridge * Create an Outbound Topic Bridge. By default the bridge uses the same
* name for both the inbound and outbound topics, however this can be altered
* by using the public setter methods to configure both inbound and outbound
* topic names.
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*
*
*/ */
public class OutboundTopicBridge extends TopicBridge { public class OutboundTopicBridge extends TopicBridge {
@ -52,6 +53,10 @@ public class OutboundTopicBridge extends TopicBridge {
} }
/** /**
* Sets the name of the outbound topic name. If the inbound topic name
* has not been set already then this method uses the provided topic name
* to set the inbound topic name as well.
*
* @param outboundTopicName The outboundTopicName to set. * @param outboundTopicName The outboundTopicName to set.
*/ */
public void setOutboundTopicName(String outboundTopicName) { public void setOutboundTopicName(String outboundTopicName) {

View File

@ -28,9 +28,7 @@ import javax.jms.QueueSession;
import javax.jms.Session; import javax.jms.Session;
/** /**
* A Destination bridge is used to bridge between to different JMS systems * A Destination bridge is used to bridge Queues between to different JMS systems
*
*
*/ */
class QueueBridge extends DestinationBridge { class QueueBridge extends DestinationBridge {
protected Queue consumerQueue; protected Queue consumerQueue;
@ -55,6 +53,7 @@ class QueueBridge extends DestinationBridge {
protected MessageConsumer createConsumer() throws JMSException { protected MessageConsumer createConsumer() throws JMSException {
// set up the consumer // set up the consumer
if (consumerConnection == null) return null;
consumerSession = consumerConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); consumerSession = consumerConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = null; MessageConsumer consumer = null;
@ -64,20 +63,28 @@ class QueueBridge extends DestinationBridge {
consumer = consumerSession.createReceiver(consumerQueue); consumer = consumerSession.createReceiver(consumerQueue);
} }
consumer.setMessageListener(this);
return consumer; return consumer;
} }
protected synchronized MessageProducer createProducer() throws JMSException { protected synchronized MessageProducer createProducer() throws JMSException {
if (producerConnection == null) return null;
producerSession = producerConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); producerSession = producerConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
producer = producerSession.createSender(null); producer = producerSession.createSender(null);
return producer; return producer;
} }
protected synchronized void sendMessage(Message message) throws JMSException { protected synchronized void sendMessage(Message message) throws JMSException {
if (producer == null) { if (producer == null && createProducer() == null) {
createProducer(); throw new JMSException("Producer for remote queue not available.");
}
try {
producer.send(producerQueue, message);
} catch (JMSException e) {
producer = null;
throw e;
} }
producer.send(producerQueue, message);
} }
/** /**
@ -92,6 +99,13 @@ class QueueBridge extends DestinationBridge {
*/ */
public void setConsumerConnection(QueueConnection consumerConnection) { public void setConsumerConnection(QueueConnection consumerConnection) {
this.consumerConnection = consumerConnection; this.consumerConnection = consumerConnection;
if (started.get()) {
try {
createConsumer();
} catch(Exception e) {
jmsConnector.handleConnectionFailure(getConnnectionForConsumer());
}
}
} }
/** /**
@ -157,5 +171,4 @@ class QueueBridge extends DestinationBridge {
protected Connection getConnectionForProducer() { protected Connection getConnectionForProducer() {
return getProducerConnection(); return getProducerConnection();
} }
} }

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network.jms;
/**
* A policy object that defines how a {@link JmsConnector} deals with
* reconnection of the local and foreign connections.
*
* @org.apache.xbean.XBean element="reconnectionPolicy"
*/
public class ReconnectionPolicy {
private int maxSendRetries = 10;
private long sendRetryDelay = 1000L;
private int maxReconnectAttempts = -1;
private int maxInitialConnectAttempts = -1;
private long maximumReconnectDelay = 30000;
private long initialReconnectDelay = 1000L;
private boolean useExponentialBackOff = false;
private double backOffMultiplier = 2.0;
/**
* Gets the maximum number of a times a Message send should be retried before
* a JMSExeception is thrown indicating that the operation failed.
*
* @return number of send retries that will be performed.
*/
public int getMaxSendRetries() {
return maxSendRetries;
}
/**
* Sets the maximum number of a times a Message send should be retried before
* a JMSExeception is thrown indicating that the operation failed.
*
* @param maxRetries
* number of send retries that will be performed.
*/
public void setMaxSendRetries(int maxSendRetries) {
this.maxSendRetries = maxSendRetries;
}
/**
* Get the amount of time the DestionationBridge will wait between attempts
* to forward a message.
*
* @return time in milliseconds to wait between send attempts.
*/
public long getSendRetryDelay() {
return this.sendRetryDelay;
}
/**
* Set the amount of time the DestionationBridge will wait between attempts
* to forward a message. The default policy limits the minimum time between
* send attempt to one second.
*
* @param sendRetryDelay
* Time in milliseconds to wait before attempting another send.
*/
public void setSendRetyDelay(long sendRetryDelay) {
if (sendRetryDelay < 1000L) {
this.sendRetryDelay = 1000L;
}
this.sendRetryDelay = sendRetryDelay;
}
/**
* Gets the number of time that {@link JmsConnector} will attempt to connect
* or reconnect before giving up. By default the policy sets this value to
* a negative value meaning try forever.
*
* @return the number of attempts to connect before giving up.
*/
public int getMaxReconnectAttempts() {
return maxReconnectAttempts;
}
/**
* Sets the number of time that {@link JmsConnector} will attempt to connect
* or reconnect before giving up. By default the policy sets this value to
* a negative value meaning try forever, set to a positive value to retry a
* fixed number of times, or zero to never try and reconnect.
*
* @param maxReconnectAttempts
*/
public void setMaxReconnectAttempts(int maxReconnectAttempts) {
this.maxReconnectAttempts = maxReconnectAttempts;
}
/**
* Gets the maximum number of times that the {@link JmsConnector} will try
* to connect on startup to before it marks itself as failed and does not
* try any further connections.
*
* @returns the max number of times a connection attempt is made before failing.
*/
public int getMaxInitialConnectAttempts() {
return this.maxInitialConnectAttempts;
}
/**
* Sets the maximum number of times that the {@link JmsConnector} will try
* to connect on startup to before it marks itself as failed and does not
* try any further connections.
*
* @param maxAttempts
* The max number of times a connection attempt is made before failing.
*/
public void setMaxInitialConnectAttempts(int maxAttempts) {
this.maxInitialConnectAttempts = maxAttempts;
}
/**
* Gets the maximum delay that is inserted between each attempt to connect
* before another attempt is made. The default setting for this value is
* 30 seconds.
*
* @return the max delay between connection attempts in milliseconds.
*/
public long getMaximumReconnectDelay() {
return maximumReconnectDelay;
}
/**
* Sets the maximum delay that is inserted between each attempt to connect
* before another attempt is made.
*
* @param maximumReconnectDelay
* The maximum delay between connection attempts in milliseconds.
*/
public void setMaximumReconnectDelay(long maximumReconnectDelay) {
this.maximumReconnectDelay = maximumReconnectDelay;
}
/**
* Gets the initial delay value used before a reconnection attempt is made. If the
* use exponential back-off value is set to false then this will be the fixed time
* between connection attempts. By default this value is set to one second.
*
* @return time in milliseconds that will be used between connection retries.
*/
public long getInitialReconnectDelay() {
return initialReconnectDelay;
}
/**
* Gets the initial delay value used before a reconnection attempt is made. If the
* use exponential back-off value is set to false then this will be the fixed time
* between connection attempts. By default this value is set to one second.
* @param initialReconnectDelay
* Time in milliseconds to wait before the first reconnection attempt.
*/
public void setInitialReconnectDelay(long initialReconnectDelay) {
this.initialReconnectDelay = initialReconnectDelay;
}
/**
* Gets whether the policy uses the set back-off multiplier to grow the time between
* connection attempts.
*
* @return true if the policy will grow the time between connection attempts.
*/
public boolean isUseExponentialBackOff() {
return useExponentialBackOff;
}
/**
* Sets whether the policy uses the set back-off multiplier to grow the time between
* connection attempts.
*
* @param useExponentialBackOff
*/
public void setUseExponentialBackOff(boolean useExponentialBackOff) {
this.useExponentialBackOff = useExponentialBackOff;
}
/**
* Gets the multiplier used to grow the delay between connection attempts from the initial
* time to the max set time. By default this value is set to 2.0.
*
* @return the currently configured connection delay multiplier.
*/
public double getBackOffMultiplier() {
return backOffMultiplier;
}
/**
* Gets the multiplier used to grow the delay between connection attempts from the initial
* time to the max set time. By default this value is set to 2.0.
*
* @param backOffMultiplier
* The multiplier value used to grow the reconnection delay.
*/
public void setBackOffMultiplier(double backOffMultiplier) {
this.backOffMultiplier = backOffMultiplier;
}
/**
* Returns the next computed delay value that the connection controller should use to
* wait before attempting another connection for the {@link JmsConnector}.
*
* @param attempt
* The current connection attempt.
*
* @return the next delay amount in milliseconds.
*/
public long getNextDelay(int attempt) {
if (attempt == 0) {
return 0;
}
long nextDelay = initialReconnectDelay;
if (useExponentialBackOff) {
nextDelay = nextDelay * (long)(attempt * backOffMultiplier);
}
if (maximumReconnectDelay > 0 && nextDelay > maximumReconnectDelay) {
nextDelay = maximumReconnectDelay;
}
return nextDelay;
}
}

View File

@ -25,16 +25,15 @@ import javax.jms.Message;
* Converts Message from one JMS to another * Converts Message from one JMS to another
* *
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
*
*
*/ */
public class SimpleJmsMessageConvertor implements JmsMesageConvertor { public class SimpleJmsMessageConvertor implements JmsMesageConvertor {
/** /**
* Convert a foreign JMS Message to a native ActiveMQ Message - Inbound or * Convert a foreign JMS Message to a native ActiveMQ Message - Inbound or
* visa-versa outbound * visa-versa outbound.
* *
* @param message * @param message
* The target message to convert to a native ActiveMQ message
* @return the converted message * @return the converted message
* @throws JMSException * @throws JMSException
*/ */
@ -42,6 +41,19 @@ public class SimpleJmsMessageConvertor implements JmsMesageConvertor {
return message; return message;
} }
/**
* Convert a foreign JMS Message to a native ActiveMQ Message - Inbound or
* visa-versa outbound. If the replyTo Destination instance is not null
* then the Message is configured with the given replyTo value.
*
* @param message
* The target message to convert to a native ActiveMQ message
* @param replyTo
* The replyTo Destination to set on the converted Message.
*
* @return the converted message
* @throws JMSException
*/
public Message convert(Message message, Destination replyTo) throws JMSException { public Message convert(Message message, Destination replyTo) throws JMSException {
Message msg = convert(message); Message msg = convert(message);
if (replyTo != null) { if (replyTo != null) {
@ -55,5 +67,4 @@ public class SimpleJmsMessageConvertor implements JmsMesageConvertor {
public void setConnection(Connection connection) { public void setConnection(Connection connection) {
// do nothing // do nothing
} }
} }

View File

@ -17,7 +17,6 @@
package org.apache.activemq.network.jms; package org.apache.activemq.network.jms;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
@ -56,6 +55,7 @@ class TopicBridge extends DestinationBridge {
protected MessageConsumer createConsumer() throws JMSException { protected MessageConsumer createConsumer() throws JMSException {
// set up the consumer // set up the consumer
if (consumerConnection == null) return null;
consumerSession = consumerConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); consumerSession = consumerConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = null; MessageConsumer consumer = null;
if (consumerName != null && consumerName.length() > 0) { if (consumerName != null && consumerName.length() > 0) {
@ -72,20 +72,29 @@ class TopicBridge extends DestinationBridge {
consumer = consumerSession.createSubscriber(consumerTopic); consumer = consumerSession.createSubscriber(consumerTopic);
} }
} }
consumer.setMessageListener(this);
return consumer; return consumer;
} }
protected synchronized MessageProducer createProducer() throws JMSException { protected synchronized MessageProducer createProducer() throws JMSException {
if (producerConnection == null) return null;
producerSession = producerConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); producerSession = producerConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
producer = producerSession.createPublisher(null); producer = producerSession.createPublisher(null);
return producer; return producer;
} }
protected synchronized void sendMessage(Message message) throws JMSException { protected synchronized void sendMessage(Message message) throws JMSException {
if (producer == null) { if (producer == null && createProducer() == null) {
createProducer(); throw new JMSException("Producer for remote queue not available.");
}
try {
producer.publish(producerTopic, message);
} catch (JMSException e) {
producer = null;
throw e;
} }
producer.publish(producerTopic, message);
} }
/** /**
@ -100,6 +109,13 @@ class TopicBridge extends DestinationBridge {
*/ */
public void setConsumerConnection(TopicConnection consumerConnection) { public void setConsumerConnection(TopicConnection consumerConnection) {
this.consumerConnection = consumerConnection; this.consumerConnection = consumerConnection;
if (started.get()) {
try {
createConsumer();
} catch(Exception e) {
jmsConnector.handleConnectionFailure(getConnnectionForConsumer());
}
}
} }
/** /**

View File

@ -0,0 +1,366 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network.jms;
import static org.junit.Assert.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.util.ArrayList;
import java.util.Iterator;
public class QueueBridgeStandaloneReconnectTest {
private static final Logger LOG = LoggerFactory.getLogger(QueueBridgeStandaloneReconnectTest.class);
private JmsQueueConnector jmsQueueConnector;
private BrokerService localBroker;
private BrokerService foreignBroker;
private ActiveMQConnectionFactory localConnectionFactory;
private ActiveMQConnectionFactory foreignConnectionFactory;
private Destination outbound;
private Destination inbound;
private ArrayList<Connection> connections = new ArrayList<Connection>();
@Test
public void testSendAndReceiveOverConnectedBridges() throws Exception {
startLocalBroker();
startForeignBroker();
jmsQueueConnector.start();
sendMessageToForeignBroker("to.foreign.broker");
sendMessageToLocalBroker("to.local.broker");
final MessageConsumer local = createConsumerForLocalBroker();
assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = local.receive(100);
if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) {
return true;
}
return false;
}
}));
final MessageConsumer foreign = createConsumerForForeignBroker();
assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = foreign.receive(100);
if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) {
return true;
}
return false;
}
}));
}
@Test
public void testSendAndReceiveOverBridgeWhenStartedBeforeBrokers() throws Exception {
jmsQueueConnector.start();
startLocalBroker();
startForeignBroker();
assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return jmsQueueConnector.isConnected();
}
}));
sendMessageToForeignBroker("to.foreign.broker");
sendMessageToLocalBroker("to.local.broker");
final MessageConsumer local = createConsumerForLocalBroker();
assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = local.receive(100);
if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) {
return true;
}
return false;
}
}));
final MessageConsumer foreign = createConsumerForForeignBroker();
assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = foreign.receive(100);
if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) {
return true;
}
return false;
}
}));
}
@Test
public void testSendAndReceiveOverBridgeWithRestart() throws Exception {
startLocalBroker();
startForeignBroker();
jmsQueueConnector.start();
assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return jmsQueueConnector.isConnected();
}
}));
stopLocalBroker();
stopForeignBroker();
assertTrue("Should have detected connection drop.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return !jmsQueueConnector.isConnected();
}
}));
startLocalBroker();
startForeignBroker();
assertTrue("Should have Re-Connected.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return jmsQueueConnector.isConnected();
}
}));
sendMessageToForeignBroker("to.foreign.broker");
sendMessageToLocalBroker("to.local.broker");
final MessageConsumer local = createConsumerForLocalBroker();
assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = local.receive(100);
if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) {
return true;
}
return false;
}
}));
final MessageConsumer foreign = createConsumerForForeignBroker();
assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = foreign.receive(100);
if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) {
return true;
}
return false;
}
}));
}
@Before
public void setUp() throws Exception {
localConnectionFactory = createLocalConnectionFactory();
foreignConnectionFactory = createForeignConnectionFactory();
outbound = new ActiveMQQueue("RECONNECT.TEST.OUT.QUEUE");
inbound = new ActiveMQQueue("RECONNECT.TEST.IN.QUEUE");
jmsQueueConnector = new JmsQueueConnector();
// Wire the bridges.
jmsQueueConnector.setOutboundQueueBridges(
new OutboundQueueBridge[] {new OutboundQueueBridge("RECONNECT.TEST.OUT.QUEUE")});
jmsQueueConnector.setInboundQueueBridges(
new InboundQueueBridge[] {new InboundQueueBridge("RECONNECT.TEST.IN.QUEUE")});
// Tell it how to reach the two brokers.
jmsQueueConnector.setOutboundQueueConnectionFactory(
new ActiveMQConnectionFactory("tcp://localhost:61617"));
jmsQueueConnector.setLocalQueueConnectionFactory(
new ActiveMQConnectionFactory("tcp://localhost:61616"));
}
@After
public void tearDown() throws Exception {
disposeConsumerConnections();
try {
jmsQueueConnector.stop();
jmsQueueConnector = null;
} catch (Exception e) {
}
try {
stopLocalBroker();
} catch (Throwable e) {
}
try {
stopForeignBroker();
} catch (Throwable e) {
}
}
protected void disposeConsumerConnections() {
for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
Connection connection = iter.next();
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected void startLocalBroker() throws Exception {
if (localBroker == null) {
localBroker = createFirstBroker();
localBroker.start();
localBroker.waitUntilStarted();
}
}
protected void stopLocalBroker() throws Exception {
if (localBroker != null) {
localBroker.stop();
localBroker.waitUntilStopped();
localBroker = null;
}
}
protected void startForeignBroker() throws Exception {
if (foreignBroker == null) {
foreignBroker = createSecondBroker();
foreignBroker.start();
foreignBroker.waitUntilStarted();
}
}
protected void stopForeignBroker() throws Exception {
if (foreignBroker != null) {
foreignBroker.stop();
foreignBroker.waitUntilStopped();
foreignBroker = null;
}
}
protected BrokerService createFirstBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("broker1");
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://localhost:61616");
return broker;
}
protected BrokerService createSecondBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("broker2");
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://localhost:61617");
return broker;
}
protected ActiveMQConnectionFactory createLocalConnectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61616");
}
protected ActiveMQConnectionFactory createForeignConnectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61617");
}
protected void sendMessageToForeignBroker(String text) throws JMSException {
Connection connection = null;
try {
connection = localConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(outbound);
TextMessage message = session.createTextMessage();
message.setText(text);
producer.send(message);
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected void sendMessageToLocalBroker(String text) throws JMSException {
Connection connection = null;
try {
connection = foreignConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(inbound);
TextMessage message = session.createTextMessage();
message.setText(text);
producer.send(message);
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected MessageConsumer createConsumerForLocalBroker() throws JMSException {
Connection connection = localConnectionFactory.createConnection();
connections.add(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session.createConsumer(inbound);
}
protected MessageConsumer createConsumerForForeignBroker() throws JMSException {
Connection connection = foreignConnectionFactory.createConnection();
connections.add(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session.createConsumer(outbound);
}
}

View File

@ -0,0 +1,338 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network.jms;
import static org.junit.Assert.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/**
* These test cases are used to verify that queue outbound bridge connections get
* re-established in all broker restart scenarios. This is possible when the
* outbound bridge is configured using the failover URI with a timeout.
*/
public class QueueOutboundBridgeReconnectTest {
private static final Logger LOG = LoggerFactory.getLogger(QueueOutboundBridgeReconnectTest.class);
private BrokerService producerBroker;
private BrokerService consumerBroker;
private ActiveMQConnectionFactory producerConnectionFactory;
private ActiveMQConnectionFactory consumerConnectionFactory;
private Destination destination;
private ArrayList<Connection> connections = new ArrayList<Connection>();
@Test
public void testMultipleProducerBrokerRestarts() throws Exception {
for (int i = 0; i < 10; i++) {
testWithProducerBrokerRestart();
disposeConsumerConnections();
}
}
@Test
public void testRestartProducerWithNoConsumer() throws Exception {
stopConsumerBroker();
startProducerBroker();
sendMessage("test123");
sendMessage("test456");
}
@Test
public void testWithoutRestartsConsumerFirst() throws Exception {
startConsumerBroker();
startProducerBroker();
sendMessage("test123");
sendMessage("test456");
MessageConsumer consumer = createConsumer();
Message message = consumer.receive(3000);
assertNotNull(message);
assertEquals("test123", ((TextMessage)message).getText());
message = consumer.receive(3000);
assertNotNull(message);
assertEquals("test456", ((TextMessage)message).getText());
assertNull(consumer.receiveNoWait());
}
@Test
public void testWithoutRestartsProducerFirst() throws Exception {
startProducerBroker();
sendMessage("test123");
startConsumerBroker();
// unless using a failover URI, the first attempt of this send will likely fail,
// so increase the timeout below to give the bridge time to recover
sendMessage("test456");
MessageConsumer consumer = createConsumer();
Message message = consumer.receive(5000);
assertNotNull(message);
assertEquals("test123", ((TextMessage) message).getText());
message = consumer.receive(5000);
assertNotNull(message);
assertEquals("test456", ((TextMessage) message).getText());
assertNull(consumer.receiveNoWait());
}
@Test
public void testWithProducerBrokerRestart() throws Exception {
startProducerBroker();
startConsumerBroker();
sendMessage("test123");
MessageConsumer consumer = createConsumer();
Message message = consumer.receive(5000);
assertNotNull(message);
assertEquals("test123", ((TextMessage)message).getText());
assertNull(consumer.receiveNoWait());
// Restart the first broker...
stopProducerBroker();
startProducerBroker();
sendMessage("test123");
message = consumer.receive(5000);
assertNotNull(message);
assertEquals("test123", ((TextMessage)message).getText());
assertNull(consumer.receiveNoWait());
}
@Test
public void testWithConsumerBrokerRestart() throws Exception {
startProducerBroker();
startConsumerBroker();
sendMessage("test123");
final MessageConsumer consumer1 = createConsumer();
Message message = consumer1.receive(5000);
assertNotNull(message);
assertEquals("test123", ((TextMessage)message).getText());
assertNull(consumer1.receiveNoWait());
consumer1.close();
// Restart the first broker...
stopConsumerBroker();
startConsumerBroker();
// unless using a failover URI, the first attempt of this send will likely fail,
// so increase the timeout below to give the bridge time to recover
sendMessage("test123");
final MessageConsumer consumer2 = createConsumer();
assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = consumer2.receiveNoWait();
if (message == null || !((TextMessage)message).getText().equals("test123")) {
return false;
}
return true;
}
}));
assertNull(consumer2.receiveNoWait());
}
@Test
public void testWithConsumerBrokerStartDelay() throws Exception {
startConsumerBroker();
final MessageConsumer consumer = createConsumer();
TimeUnit.SECONDS.sleep(5);
startProducerBroker();
sendMessage("test123");
assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = consumer.receiveNoWait();
if (message == null || !((TextMessage)message).getText().equals("test123")) {
return false;
}
return true;
}
}));
assertNull(consumer.receiveNoWait());
}
@Test
public void testWithProducerBrokerStartDelay() throws Exception {
startProducerBroker();
TimeUnit.SECONDS.sleep(5);
startConsumerBroker();
MessageConsumer consumer = createConsumer();
sendMessage("test123");
Message message = consumer.receive(5000);
assertNotNull(message);
assertEquals("test123", ((TextMessage)message).getText());
assertNull(consumer.receiveNoWait());
}
@Before
public void setUp() throws Exception {
producerConnectionFactory = createProducerConnectionFactory();
consumerConnectionFactory = createConsumerConnectionFactory();
destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE");
}
@After
public void tearDown() throws Exception {
disposeConsumerConnections();
try {
stopProducerBroker();
} catch (Throwable e) {
}
try {
stopConsumerBroker();
} catch (Throwable e) {
}
}
protected void disposeConsumerConnections() {
for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
Connection connection = iter.next();
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected void startProducerBroker() throws Exception {
if (producerBroker == null) {
producerBroker = createFirstBroker();
producerBroker.start();
}
}
protected void stopProducerBroker() throws Exception {
if (producerBroker != null) {
producerBroker.stop();
producerBroker = null;
}
}
protected void startConsumerBroker() throws Exception {
if (consumerBroker == null) {
consumerBroker = createSecondBroker();
consumerBroker.start();
}
}
protected void stopConsumerBroker() throws Exception {
if (consumerBroker != null) {
consumerBroker.stop();
consumerBroker = null;
}
}
protected BrokerService createFirstBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("broker1");
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://localhost:61616");
broker.addConnector("vm://broker1");
JmsQueueConnector jmsQueueConnector = new JmsQueueConnector();
jmsQueueConnector.setOutboundQueueBridges(
new OutboundQueueBridge[] {new OutboundQueueBridge("RECONNECT.TEST.QUEUE")});
jmsQueueConnector.setOutboundQueueConnectionFactory(
new ActiveMQConnectionFactory("tcp://localhost:61617"));
broker.setJmsBridgeConnectors(new JmsConnector[]{jmsQueueConnector});
return broker;
}
protected BrokerService createSecondBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("broker2");
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://localhost:61617");
broker.addConnector("vm://broker2");
return broker;
}
protected ActiveMQConnectionFactory createProducerConnectionFactory() {
return new ActiveMQConnectionFactory("vm://broker1");
}
protected ActiveMQConnectionFactory createConsumerConnectionFactory() {
return new ActiveMQConnectionFactory("vm://broker2");
}
protected void sendMessage(String text) throws JMSException {
Connection connection = null;
try {
connection = producerConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage();
message.setText(text);
producer.send(message);
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected MessageConsumer createConsumer() throws JMSException {
Connection connection = consumerConnectionFactory.createConnection();
connections.add(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session.createConsumer(destination);
}
}

View File

@ -0,0 +1,363 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network.jms;
import static org.junit.Assert.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.util.ArrayList;
import java.util.Iterator;
public class TopicBridgeStandaloneReconnectTest {
private static final Logger LOG = LoggerFactory.getLogger(TopicBridgeStandaloneReconnectTest.class);
private JmsTopicConnector jmsTopicConnector;
private BrokerService localBroker;
private BrokerService foreignBroker;
private ActiveMQConnectionFactory localConnectionFactory;
private ActiveMQConnectionFactory foreignConnectionFactory;
private Destination outbound;
private Destination inbound;
private ArrayList<Connection> connections = new ArrayList<Connection>();
@Test
public void testSendAndReceiveOverConnectedBridges() throws Exception {
startLocalBroker();
startForeignBroker();
jmsTopicConnector.start();
final MessageConsumer local = createConsumerForLocalBroker();
final MessageConsumer foreign = createConsumerForForeignBroker();
sendMessageToForeignBroker("to.foreign.broker");
sendMessageToLocalBroker("to.local.broker");
assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = local.receive(100);
if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) {
return true;
}
return false;
}
}));
assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = foreign.receive(100);
if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) {
return true;
}
return false;
}
}));
}
@Test
public void testSendAndReceiveOverBridgeWhenStartedBeforeBrokers() throws Exception {
jmsTopicConnector.start();
startLocalBroker();
startForeignBroker();
assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return jmsTopicConnector.isConnected();
}
}));
final MessageConsumer local = createConsumerForLocalBroker();
final MessageConsumer foreign = createConsumerForForeignBroker();
sendMessageToForeignBroker("to.foreign.broker");
sendMessageToLocalBroker("to.local.broker");
assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = local.receive(100);
if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) {
return true;
}
return false;
}
}));
assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = foreign.receive(100);
if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) {
return true;
}
return false;
}
}));
}
@Test
public void testSendAndReceiveOverBridgeWithRestart() throws Exception {
startLocalBroker();
startForeignBroker();
jmsTopicConnector.start();
assertTrue("Should have Connected.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return jmsTopicConnector.isConnected();
}
}));
stopLocalBroker();
stopForeignBroker();
assertTrue("Should have detected connection drop.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return !jmsTopicConnector.isConnected();
}
}));
startLocalBroker();
startForeignBroker();
assertTrue("Should have Re-Connected.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return jmsTopicConnector.isConnected();
}
}));
final MessageConsumer local = createConsumerForLocalBroker();
final MessageConsumer foreign = createConsumerForForeignBroker();
sendMessageToForeignBroker("to.foreign.broker");
sendMessageToLocalBroker("to.local.broker");
assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = local.receive(100);
if (message != null && ((TextMessage) message).getText().equals("to.local.broker")) {
return true;
}
return false;
}
}));
assertTrue("Should have received a Message.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = foreign.receive(100);
if (message != null && ((TextMessage) message).getText().equals("to.foreign.broker")) {
return true;
}
return false;
}
}));
}
@Before
public void setUp() throws Exception {
localConnectionFactory = createLocalConnectionFactory();
foreignConnectionFactory = createForeignConnectionFactory();
outbound = new ActiveMQTopic("RECONNECT.TEST.OUT.TOPIC");
inbound = new ActiveMQTopic("RECONNECT.TEST.IN.TOPIC");
jmsTopicConnector = new JmsTopicConnector();
// Wire the bridges.
jmsTopicConnector.setOutboundTopicBridges(
new OutboundTopicBridge[] {new OutboundTopicBridge("RECONNECT.TEST.OUT.TOPIC")});
jmsTopicConnector.setInboundTopicBridges(
new InboundTopicBridge[] {new InboundTopicBridge("RECONNECT.TEST.IN.TOPIC")});
// Tell it how to reach the two brokers.
jmsTopicConnector.setOutboundTopicConnectionFactory(
new ActiveMQConnectionFactory("tcp://localhost:61617"));
jmsTopicConnector.setLocalTopicConnectionFactory(
new ActiveMQConnectionFactory("tcp://localhost:61616"));
}
@After
public void tearDown() throws Exception {
disposeConsumerConnections();
try {
jmsTopicConnector.stop();
jmsTopicConnector = null;
} catch (Exception e) {
}
try {
stopLocalBroker();
} catch (Throwable e) {
}
try {
stopForeignBroker();
} catch (Throwable e) {
}
}
protected void disposeConsumerConnections() {
for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
Connection connection = iter.next();
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected void startLocalBroker() throws Exception {
if (localBroker == null) {
localBroker = createFirstBroker();
localBroker.start();
localBroker.waitUntilStarted();
}
}
protected void stopLocalBroker() throws Exception {
if (localBroker != null) {
localBroker.stop();
localBroker.waitUntilStopped();
localBroker = null;
}
}
protected void startForeignBroker() throws Exception {
if (foreignBroker == null) {
foreignBroker = createSecondBroker();
foreignBroker.start();
foreignBroker.waitUntilStarted();
}
}
protected void stopForeignBroker() throws Exception {
if (foreignBroker != null) {
foreignBroker.stop();
foreignBroker.waitUntilStopped();
foreignBroker = null;
}
}
protected BrokerService createFirstBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("broker1");
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://localhost:61616");
return broker;
}
protected BrokerService createSecondBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("broker2");
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://localhost:61617");
return broker;
}
protected ActiveMQConnectionFactory createLocalConnectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61616");
}
protected ActiveMQConnectionFactory createForeignConnectionFactory() {
return new ActiveMQConnectionFactory("tcp://localhost:61617");
}
protected void sendMessageToForeignBroker(String text) throws JMSException {
Connection connection = null;
try {
connection = localConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(outbound);
TextMessage message = session.createTextMessage();
message.setText(text);
producer.send(message);
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected void sendMessageToLocalBroker(String text) throws JMSException {
Connection connection = null;
try {
connection = foreignConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(inbound);
TextMessage message = session.createTextMessage();
message.setText(text);
producer.send(message);
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected MessageConsumer createConsumerForLocalBroker() throws JMSException {
Connection connection = localConnectionFactory.createConnection();
connections.add(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session.createConsumer(inbound);
}
protected MessageConsumer createConsumerForForeignBroker() throws JMSException {
Connection connection = foreignConnectionFactory.createConnection();
connections.add(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session.createConsumer(outbound);
}
}

View File

@ -0,0 +1,326 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network.jms;
import static org.junit.Assert.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/**
* These test cases are used to verify that queue outbound bridge connections get
* re-established in all broker restart scenarios. This is possible when the
* outbound bridge is configured using the failover URI with a timeout.
*/
public class TopicOutboundBridgeReconnectTest {
private static final Logger LOG = LoggerFactory.getLogger(TopicOutboundBridgeReconnectTest.class);
private BrokerService producerBroker;
private BrokerService consumerBroker;
private ActiveMQConnectionFactory producerConnectionFactory;
private ActiveMQConnectionFactory consumerConnectionFactory;
private Destination destination;
private ArrayList<Connection> connections = new ArrayList<Connection>();
@Test
public void testMultipleProducerBrokerRestarts() throws Exception {
for (int i = 0; i < 10; i++) {
testWithProducerBrokerRestart();
disposeConsumerConnections();
}
}
@Test
public void testWithoutRestartsConsumerFirst() throws Exception {
startConsumerBroker();
startProducerBroker();
MessageConsumer consumer = createConsumer();
sendMessage("test123");
sendMessage("test456");
Message message = consumer.receive(2000);
assertNotNull(message);
assertEquals("test123", ((TextMessage)message).getText());
message = consumer.receive(5000);
assertNotNull(message);
assertEquals("test456", ((TextMessage)message).getText());
assertNull(consumer.receiveNoWait());
}
@Test
public void testWithoutRestartsProducerFirst() throws Exception {
startProducerBroker();
sendMessage("test123");
startConsumerBroker();
// unless using a failover URI, the first attempt of this send will likely fail, so increase the timeout below
// to give the bridge time to recover
sendMessage("test456");
MessageConsumer consumer = createConsumer();
Message message = consumer.receive(5000);
assertNotNull(message);
assertEquals("test123", ((TextMessage) message).getText());
message = consumer.receive(5000);
assertNotNull(message);
assertEquals("test456", ((TextMessage) message).getText());
assertNull(consumer.receiveNoWait());
}
@Test
public void testWithProducerBrokerRestart() throws Exception {
startProducerBroker();
startConsumerBroker();
MessageConsumer consumer = createConsumer();
sendMessage("test123");
Message message = consumer.receive(5000);
assertNotNull(message);
assertEquals("test123", ((TextMessage)message).getText());
assertNull(consumer.receiveNoWait());
// Restart the first broker...
stopProducerBroker();
startProducerBroker();
sendMessage("test123");
message = consumer.receive(5000);
assertNotNull(message);
assertEquals("test123", ((TextMessage)message).getText());
assertNull(consumer.receiveNoWait());
}
@Test
public void testWithConsumerBrokerRestart() throws Exception {
startProducerBroker();
startConsumerBroker();
final MessageConsumer consumer1 = createConsumer();
sendMessage("test123");
Message message = consumer1.receive(5000);
assertNotNull(message);
assertEquals("test123", ((TextMessage)message).getText());
assertNull(consumer1.receiveNoWait());
consumer1.close();
// Restart the first broker...
stopConsumerBroker();
startConsumerBroker();
// unless using a failover URI, the first attempt of this send will likely fail, so increase the timeout below
// to give the bridge time to recover
sendMessage("test123");
final MessageConsumer consumer2 = createConsumer();
assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = consumer2.receiveNoWait();
if (message == null || !((TextMessage)message).getText().equals("test123")) {
return false;
}
return true;
}
}));
assertNull(consumer2.receiveNoWait());
}
@Test
public void testWithConsumerBrokerStartDelay() throws Exception {
startConsumerBroker();
final MessageConsumer consumer = createConsumer();
TimeUnit.SECONDS.sleep(5);
startProducerBroker();
sendMessage("test123");
assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Message message = consumer.receiveNoWait();
if (message == null || !((TextMessage)message).getText().equals("test123")) {
return false;
}
return true;
}
}));
assertNull(consumer.receiveNoWait());
}
@Test
public void testWithProducerBrokerStartDelay() throws Exception {
startProducerBroker();
TimeUnit.SECONDS.sleep(5);
startConsumerBroker();
MessageConsumer consumer = createConsumer();
sendMessage("test123");
Message message = consumer.receive(2000);
assertNotNull(message);
assertEquals("test123", ((TextMessage)message).getText());
assertNull(consumer.receiveNoWait());
}
@Before
public void setUp() throws Exception {
producerConnectionFactory = createProducerConnectionFactory();
consumerConnectionFactory = createConsumerConnectionFactory();
destination = new ActiveMQTopic("RECONNECT.TEST.TOPIC");
}
@After
public void tearDown() throws Exception {
disposeConsumerConnections();
try {
stopProducerBroker();
} catch (Throwable e) {
}
try {
stopConsumerBroker();
} catch (Throwable e) {
}
}
protected void disposeConsumerConnections() {
for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) {
Connection connection = iter.next();
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected void startProducerBroker() throws Exception {
if (producerBroker == null) {
producerBroker = createFirstBroker();
producerBroker.start();
}
}
protected void stopProducerBroker() throws Exception {
if (producerBroker != null) {
producerBroker.stop();
producerBroker = null;
}
}
protected void startConsumerBroker() throws Exception {
if (consumerBroker == null) {
consumerBroker = createSecondBroker();
consumerBroker.start();
}
}
protected void stopConsumerBroker() throws Exception {
if (consumerBroker != null) {
consumerBroker.stop();
consumerBroker = null;
}
}
protected BrokerService createFirstBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("broker1");
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://localhost:61616");
broker.addConnector("vm://broker1");
JmsTopicConnector jmsTopicConnector = new JmsTopicConnector();
jmsTopicConnector.setOutboundTopicBridges(
new OutboundTopicBridge[] {new OutboundTopicBridge("RECONNECT.TEST.TOPIC")});
jmsTopicConnector.setOutboundTopicConnectionFactory(
new ActiveMQConnectionFactory("tcp://localhost:61617"));
broker.setJmsBridgeConnectors(new JmsConnector[]{jmsTopicConnector});
return broker;
}
protected BrokerService createSecondBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("broker2");
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://localhost:61617");
broker.addConnector("vm://broker2");
return broker;
}
protected ActiveMQConnectionFactory createProducerConnectionFactory() {
return new ActiveMQConnectionFactory("vm://broker1");
}
protected ActiveMQConnectionFactory createConsumerConnectionFactory() {
return new ActiveMQConnectionFactory("vm://broker2");
}
protected void sendMessage(String text) throws JMSException {
Connection connection = null;
try {
connection = producerConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage();
message.setText(text);
producer.send(message);
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected MessageConsumer createConsumer() throws JMSException {
Connection connection = consumerConnectionFactory.createConnection();
connections.add(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session.createConsumer(destination);
}
}