attempt to reconnect to the remote JMS broker if we get a fail sending a message to it - an attempt at fixing AMQ-895

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@436752 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-08-25 11:01:30 +00:00
parent 09990f2eab
commit fbad105e7d
6 changed files with 213 additions and 158 deletions

View File

@ -17,6 +17,12 @@
*/
package org.apache.activemq.network.jms;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
@ -24,29 +30,26 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.naming.NamingException;
import org.apache.activemq.Service;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* A Destination bridge is used to bridge between to different JMS systems
*
* @version $Revision: 1.1.1.1 $
*/
public abstract class DestinationBridge implements Service,MessageListener{
private static final Log log=LogFactory.getLog(DestinationBridge.class);
public abstract class DestinationBridge implements Service, MessageListener {
private static final Log log = LogFactory.getLog(DestinationBridge.class);
protected MessageConsumer consumer;
protected AtomicBoolean started=new AtomicBoolean(false);
protected AtomicBoolean started = new AtomicBoolean(false);
protected JmsMesageConvertor jmsMessageConvertor;
protected boolean doHandleReplyTo = true;
protected JmsConnector jmsConnector;
private int maximumRetries = 10;
/**
* @return Returns the consumer.
*/
public MessageConsumer getConsumer(){
public MessageConsumer getConsumer() {
return consumer;
}
@ -54,88 +57,110 @@ public abstract class DestinationBridge implements Service,MessageListener{
* @param consumer
* The consumer to set.
*/
public void setConsumer(MessageConsumer consumer){
this.consumer=consumer;
public void setConsumer(MessageConsumer consumer) {
this.consumer = consumer;
}
/**
* @param connector
*/
public void setJmsConnector(JmsConnector connector){
public void setJmsConnector(JmsConnector connector) {
this.jmsConnector = connector;
}
/**
* @return Returns the inboundMessageConvertor.
*/
public JmsMesageConvertor getJmsMessageConvertor(){
public JmsMesageConvertor getJmsMessageConvertor() {
return jmsMessageConvertor;
}
/**
* @param jmsMessageConvertor
*/
public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor){
this.jmsMessageConvertor=jmsMessageConvertor;
public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
this.jmsMessageConvertor = jmsMessageConvertor;
}
public int getMaximumRetries() {
return maximumRetries;
}
protected Destination processReplyToDestination (Destination destination){
/**
* Sets the maximum number of retries if a send fails before closing the
* bridge
*/
public void setMaximumRetries(int maximumRetries) {
this.maximumRetries = maximumRetries;
}
protected Destination processReplyToDestination(Destination destination) {
return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
}
public void start() throws Exception{
if(started.compareAndSet(false,true)){
MessageConsumer consumer=createConsumer();
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
MessageConsumer consumer = createConsumer();
consumer.setMessageListener(this);
createProducer();
}
}
public void stop() throws Exception{
public void stop() throws Exception {
started.set(false);
}
public void onMessage(Message message){
if(started.get()&&message!=null){
try{
Message converted;
if(doHandleReplyTo){
Destination replyTo = message.getJMSReplyTo();
if(replyTo != null){
converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
} else {
converted = jmsMessageConvertor.convert(message);
}
} else {
message.setJMSReplyTo(null);
converted = jmsMessageConvertor.convert(message);
}
sendMessage(converted);
message.acknowledge();
}catch(JMSException e){
log.error("failed to forward message: "+message,e);
try{
stop();
}catch(Exception e1){
log.warn("Failed to stop cleanly",e1);
}
}
}
public void onMessage(Message message) {
if (started.get() && message != null) {
int attempt = 0;
try {
if (attempt > 0) {
restartProducer();
}
Message converted;
if (doHandleReplyTo) {
Destination replyTo = message.getJMSReplyTo();
if (replyTo != null) {
converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
}
else {
converted = jmsMessageConvertor.convert(message);
}
}
else {
message.setJMSReplyTo(null);
converted = jmsMessageConvertor.convert(message);
}
sendMessage(converted);
message.acknowledge();
}
catch (Exception e) {
log.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e);
if (maximumRetries > 0 && attempt >= maximumRetries) {
try {
stop();
}
catch (Exception e1) {
log.warn("Failed to stop cleanly", e1);
}
}
}
}
}
/**
* @return Returns the doHandleReplyTo.
*/
protected boolean isDoHandleReplyTo(){
protected boolean isDoHandleReplyTo() {
return doHandleReplyTo;
}
/**
* @param doHandleReplyTo The doHandleReplyTo to set.
* @param doHandleReplyTo
* The doHandleReplyTo to set.
*/
protected void setDoHandleReplyTo(boolean doHandleReplyTo){
this.doHandleReplyTo=doHandleReplyTo;
protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
this.doHandleReplyTo = doHandleReplyTo;
}
protected abstract MessageConsumer createConsumer() throws JMSException;
@ -148,5 +173,14 @@ public abstract class DestinationBridge implements Service,MessageListener{
protected abstract Connection getConnectionForProducer();
protected void restartProducer() throws JMSException, NamingException {
try {
getConnectionForProducer().close();
}
catch (Exception e) {
log.debug("Ignoring failure to close producer connection: " + e, e);
}
jmsConnector.restartProducerConnection();
createProducer();
}
}

View File

@ -23,6 +23,8 @@ import java.util.Map;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service;
@ -36,14 +38,15 @@ import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* This bridge joins the gap between foreign JMS providers and ActiveMQ As some JMS providers are still only 1.0.1
* compliant, this bridge itself aimed to be JMS 1.0.2 compliant.
* This bridge joins the gap between foreign JMS providers and ActiveMQ As some
* JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be
* JMS 1.0.2 compliant.
*
* @version $Revision: 1.1.1.1 $
*/
public abstract class JmsConnector implements Service{
public abstract class JmsConnector implements Service {
private static final Log log=LogFactory.getLog(JmsConnector.class);
private static final Log log = LogFactory.getLog(JmsConnector.class);
protected JndiTemplate jndiLocalTemplate;
protected JndiTemplate jndiOutboundTemplate;
protected JmsMesageConvertor inboundMessageConvertor;
@ -52,101 +55,103 @@ public abstract class JmsConnector implements Service{
private List outboundBridges = new CopyOnWriteArrayList();
protected AtomicBoolean initialized = new AtomicBoolean(false);
protected AtomicBoolean started = new AtomicBoolean(false);
protected ActiveMQConnectionFactory embeddedConnectionFactory;
protected int replyToDestinationCacheSize=10000;
protected ActiveMQConnectionFactory embeddedConnectionFactory;
protected int replyToDestinationCacheSize = 10000;
protected String outboundUsername;
protected String outboundPassword;
protected String localUsername;
protected String localPassword;
private String name;
protected LRUCache replyToBridges=new LRUCache(){
protected LRUCache replyToBridges = new LRUCache() {
/**
*
*/
private static final long serialVersionUID = -7446792754185879286L;
protected boolean removeEldestEntry(Map.Entry enty){
if(size()>maxCacheSize){
Iterator iter=entrySet().iterator();
Map.Entry lru=(Map.Entry) iter.next();
protected boolean removeEldestEntry(Map.Entry enty) {
if (size() > maxCacheSize) {
Iterator iter = entrySet().iterator();
Map.Entry lru = (Map.Entry) iter.next();
remove(lru.getKey());
DestinationBridge bridge=(DestinationBridge) lru.getValue();
try{
DestinationBridge bridge = (DestinationBridge) lru.getValue();
try {
bridge.stop();
log.info("Expired bridge: "+bridge);
}catch(Exception e){
log.warn("stopping expired bridge"+bridge+" caused an exception",e);
log.info("Expired bridge: " + bridge);
}
catch (Exception e) {
log.warn("stopping expired bridge" + bridge + " caused an exception", e);
}
}
return false;
}
};
public boolean init(){
boolean result=initialized.compareAndSet(false,true);
if(result){
if(jndiLocalTemplate==null){
jndiLocalTemplate=new JndiTemplate();
public boolean init() {
boolean result = initialized.compareAndSet(false, true);
if (result) {
if (jndiLocalTemplate == null) {
jndiLocalTemplate = new JndiTemplate();
}
if(jndiOutboundTemplate==null){
jndiOutboundTemplate=new JndiTemplate();
if (jndiOutboundTemplate == null) {
jndiOutboundTemplate = new JndiTemplate();
}
if(inboundMessageConvertor==null){
inboundMessageConvertor=new SimpleJmsMessageConvertor();
if (inboundMessageConvertor == null) {
inboundMessageConvertor = new SimpleJmsMessageConvertor();
}
if (outboundMessageConvertor==null){
outboundMessageConvertor=new SimpleJmsMessageConvertor();
if (outboundMessageConvertor == null) {
outboundMessageConvertor = new SimpleJmsMessageConvertor();
}
replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
}
return result;
}
public void start() throws Exception{
public void start() throws Exception {
init();
if (started.compareAndSet(false, true)){
for(int i=0;i<inboundBridges.size();i++){
DestinationBridge bridge=(DestinationBridge) inboundBridges.get(i);
if (started.compareAndSet(false, true)) {
for (int i = 0; i < inboundBridges.size(); i++) {
DestinationBridge bridge = (DestinationBridge) inboundBridges.get(i);
bridge.start();
}
for(int i=0;i<outboundBridges.size();i++){
DestinationBridge bridge=(DestinationBridge) outboundBridges.get(i);
for (int i = 0; i < outboundBridges.size(); i++) {
DestinationBridge bridge = (DestinationBridge) outboundBridges.get(i);
bridge.start();
}
log.info("JMS Connector "+getName()+" Started");
log.info("JMS Connector " + getName() + " Started");
}
}
public void stop() throws Exception{
if(started.compareAndSet(true,false)){
for(int i=0;i<inboundBridges.size();i++){
DestinationBridge bridge=(DestinationBridge) inboundBridges.get(i);
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
for (int i = 0; i < inboundBridges.size(); i++) {
DestinationBridge bridge = (DestinationBridge) inboundBridges.get(i);
bridge.stop();
}
for(int i=0;i<outboundBridges.size();i++){
DestinationBridge bridge=(DestinationBridge) outboundBridges.get(i);
for (int i = 0; i < outboundBridges.size(); i++) {
DestinationBridge bridge = (DestinationBridge) outboundBridges.get(i);
bridge.stop();
}
log.info("JMS Connector "+getName()+" Stopped");
log.info("JMS Connector " + getName() + " Stopped");
}
}
protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
/**
* One way to configure the local connection - this is called by
* The BrokerService when the Connector is embedded
* One way to configure the local connection - this is called by The
* BrokerService when the Connector is embedded
*
* @param service
*/
public void setBrokerService(BrokerService service){
public void setBrokerService(BrokerService service) {
embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
}
/**
* @return Returns the jndiTemplate.
*/
public JndiTemplate getJndiLocalTemplate(){
public JndiTemplate getJndiLocalTemplate() {
return jndiLocalTemplate;
}
@ -154,28 +159,29 @@ public abstract class JmsConnector implements Service{
* @param jndiTemplate
* The jndiTemplate to set.
*/
public void setJndiLocalTemplate(JndiTemplate jndiTemplate){
this.jndiLocalTemplate=jndiTemplate;
public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
this.jndiLocalTemplate = jndiTemplate;
}
/**
* @return Returns the jndiOutboundTemplate.
*/
public JndiTemplate getJndiOutboundTemplate(){
public JndiTemplate getJndiOutboundTemplate() {
return jndiOutboundTemplate;
}
/**
* @param jndiOutboundTemplate The jndiOutboundTemplate to set.
* @param jndiOutboundTemplate
* The jndiOutboundTemplate to set.
*/
public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate){
this.jndiOutboundTemplate=jndiOutboundTemplate;
public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
this.jndiOutboundTemplate = jndiOutboundTemplate;
}
/**
* @return Returns the inboundMessageConvertor.
*/
public JmsMesageConvertor getInboundMessageConvertor(){
public JmsMesageConvertor getInboundMessageConvertor() {
return inboundMessageConvertor;
}
@ -183,28 +189,29 @@ public abstract class JmsConnector implements Service{
* @param inboundMessageConvertor
* The inboundMessageConvertor to set.
*/
public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor){
this.inboundMessageConvertor=jmsMessageConvertor;
public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
this.inboundMessageConvertor = jmsMessageConvertor;
}
/**
* @return Returns the outboundMessageConvertor.
*/
public JmsMesageConvertor getOutboundMessageConvertor(){
public JmsMesageConvertor getOutboundMessageConvertor() {
return outboundMessageConvertor;
}
/**
* @param outboundMessageConvertor The outboundMessageConvertor to set.
* @param outboundMessageConvertor
* The outboundMessageConvertor to set.
*/
public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor){
this.outboundMessageConvertor=outboundMessageConvertor;
public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
this.outboundMessageConvertor = outboundMessageConvertor;
}
/**
* @return Returns the replyToDestinationCacheSize.
*/
public int getReplyToDestinationCacheSize(){
public int getReplyToDestinationCacheSize() {
return replyToDestinationCacheSize;
}
@ -212,90 +219,95 @@ public abstract class JmsConnector implements Service{
* @param replyToDestinationCacheSize
* The replyToDestinationCacheSize to set.
*/
public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize){
this.replyToDestinationCacheSize=replyToDestinationCacheSize;
public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
this.replyToDestinationCacheSize = replyToDestinationCacheSize;
}
/**
* @return Returns the localPassword.
*/
public String getLocalPassword(){
public String getLocalPassword() {
return localPassword;
}
/**
* @param localPassword The localPassword to set.
* @param localPassword
* The localPassword to set.
*/
public void setLocalPassword(String localPassword){
this.localPassword=localPassword;
public void setLocalPassword(String localPassword) {
this.localPassword = localPassword;
}
/**
* @return Returns the localUsername.
*/
public String getLocalUsername(){
public String getLocalUsername() {
return localUsername;
}
/**
* @param localUsername The localUsername to set.
* @param localUsername
* The localUsername to set.
*/
public void setLocalUsername(String localUsername){
this.localUsername=localUsername;
public void setLocalUsername(String localUsername) {
this.localUsername = localUsername;
}
/**
* @return Returns the outboundPassword.
*/
public String getOutboundPassword(){
public String getOutboundPassword() {
return outboundPassword;
}
/**
* @param outboundPassword The outboundPassword to set.
* @param outboundPassword
* The outboundPassword to set.
*/
public void setOutboundPassword(String outboundPassword){
this.outboundPassword=outboundPassword;
public void setOutboundPassword(String outboundPassword) {
this.outboundPassword = outboundPassword;
}
/**
* @return Returns the outboundUsername.
*/
public String getOutboundUsername(){
public String getOutboundUsername() {
return outboundUsername;
}
/**
* @param outboundUsername The outboundUsername to set.
* @param outboundUsername
* The outboundUsername to set.
*/
public void setOutboundUsername(String outboundUsername){
this.outboundUsername=outboundUsername;
public void setOutboundUsername(String outboundUsername) {
this.outboundUsername = outboundUsername;
}
protected void addInboundBridge(DestinationBridge bridge){
protected void addInboundBridge(DestinationBridge bridge) {
inboundBridges.add(bridge);
}
protected void addOutboundBridge(DestinationBridge bridge){
protected void addOutboundBridge(DestinationBridge bridge) {
outboundBridges.add(bridge);
}
protected void removeInboundBridge(DestinationBridge bridge){
protected void removeInboundBridge(DestinationBridge bridge) {
inboundBridges.add(bridge);
}
protected void removeOutboundBridge(DestinationBridge bridge){
protected void removeOutboundBridge(DestinationBridge bridge) {
outboundBridges.add(bridge);
}
public String getName() {
if( name == null ) {
name = "Connector:"+getNextId();
if (name == null) {
name = "Connector:" + getNextId();
}
return name;
}
static int nextId;
static private synchronized int getNextId() {
return nextId++;
}
@ -303,4 +315,6 @@ public abstract class JmsConnector implements Service{
public void setName(String name) {
this.name = name;
}
public abstract void restartProducerConnection() throws NamingException, JMSException;
}

View File

@ -17,6 +17,9 @@
*/
package org.apache.activemq.network.jms;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
@ -26,9 +29,6 @@ import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.NamingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A Bridge to other JMS Queue providers
*
@ -186,6 +186,10 @@ public class JmsQueueConnector extends JmsConnector{
this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
}
public void restartProducerConnection() throws NamingException, JMSException {
outboundQueueConnection = null;
initializeForeignQueueConnection();
}
protected void initializeForeignQueueConnection() throws NamingException,JMSException{
if(outboundQueueConnection==null){

View File

@ -188,6 +188,10 @@ public class JmsTopicConnector extends JmsConnector{
}
public void restartProducerConnection() throws NamingException, JMSException {
outboundTopicConnection = null;
initializeForeignTopicConnection();
}
protected void initializeForeignTopicConnection() throws NamingException,JMSException{
if(outboundTopicConnection==null){

View File

@ -57,11 +57,9 @@ class QueueBridge extends DestinationBridge{
}
protected MessageConsumer createConsumer() throws JMSException{
// set up the consumer
consumerSession=consumerConnection.createQueueSession(false,Session.CLIENT_ACKNOWLEDGE);
producerSession=producerConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer=null;
if(selector!=null&&selector.length()>0){
@ -74,6 +72,7 @@ class QueueBridge extends DestinationBridge{
}
protected MessageProducer createProducer() throws JMSException{
producerSession=producerConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
producer = producerSession.createSender(null);
return producer;
}

View File

@ -60,7 +60,6 @@ class TopicBridge extends DestinationBridge{
protected MessageConsumer createConsumer() throws JMSException{
// set up the consumer
consumerSession=consumerConnection.createTopicSession(false,Session.CLIENT_ACKNOWLEDGE);
producerSession=producerConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer=null;
if(consumerName!=null&&consumerName.length()>0){
if(selector!=null&&selector.length()>0){
@ -81,6 +80,7 @@ class TopicBridge extends DestinationBridge{
protected MessageProducer createProducer() throws JMSException{
producerSession=producerConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
producer = producerSession.createPublisher(null);
return producer;
}