diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml index 4e73ac1dfd..49216225fc 100644 --- a/activemq-osgi/pom.xml +++ b/activemq-osgi/pom.xml @@ -63,6 +63,7 @@ javax.jms*, javax.management*, javax.transaction*, + javax.naming*;resolution:=optional, org.apache.commons.pool*;resolution:=optional, org.apache.commons.net*;resolution:=optional, com.sun.syndication*;resolution:=optional, diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java b/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java index dcf6ac2213..dbc3d5ac74 100644 --- a/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java @@ -16,18 +16,53 @@ */ package org.apache.activemq.pool; +import java.io.Serializable; +import java.util.Hashtable; +import javax.jms.JMSException; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.Name; +import javax.naming.spi.ObjectFactory; import javax.transaction.TransactionManager; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.util.IntrospectionSupport; /** * A pooled connection factory that automatically enlists * sessions in the current active XA transaction if any. */ -public class XaPooledConnectionFactory extends PooledConnectionFactory { +public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory, Serializable, QueueConnectionFactory, TopicConnectionFactory +{ private TransactionManager transactionManager; + private boolean tmFromJndi = false; + private String tmJndiName = "java:/TransactionManager"; + + public String getTmJndiName() { + return tmJndiName; + } + + public void setTmJndiName(String tmJndiName) { + this.tmJndiName = tmJndiName; + } + + public boolean isTmFromJndi() { + return tmFromJndi; + } + + /** + * Allow transaction manager resolution from JNDI (ee deployment) + * @param tmFromJndi + */ + public void setTmFromJndi(boolean tmFromJndi) { + this.tmFromJndi = tmFromJndi; + } public XaPooledConnectionFactory() { super(); @@ -42,6 +77,13 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory { } public TransactionManager getTransactionManager() { + if (transactionManager == null && tmFromJndi) { + try { + transactionManager = (TransactionManager) new InitialContext().lookup(getTmJndiName()); + } catch (Throwable ignored) { + ignored.printStackTrace(); + } + } return transactionManager; } @@ -53,4 +95,33 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory { protected ConnectionPool createConnectionPool(ActiveMQConnection connection) { return new XaConnectionPool(connection, getTransactionManager()); } + + @Override + public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable environment) throws Exception { + setTmFromJndi(true); + if (environment != null) { + IntrospectionSupport.setProperties(this, environment); + } + return this; + } + + @Override + public QueueConnection createQueueConnection() throws JMSException { + return (QueueConnection) createConnection(); + } + + @Override + public QueueConnection createQueueConnection(String userName, String password) throws JMSException { + return (QueueConnection) createConnection(userName, password); + } + + @Override + public TopicConnection createTopicConnection() throws JMSException { + return (TopicConnection) createConnection(); + } + + @Override + public TopicConnection createTopicConnection(String userName, String password) throws JMSException { + return (TopicConnection) createConnection(userName, password); + } } diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java index f9486df310..898e7bf8a0 100644 --- a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java +++ b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java @@ -16,11 +16,15 @@ */ package org.apache.activemq.pool; +import java.util.Hashtable; import java.util.Vector; +import javax.jms.QueueConnectionFactory; import javax.jms.Session; import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; +import javax.naming.spi.ObjectFactory; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.InvalidTransactionException; @@ -140,4 +144,27 @@ public class XAConnectionPoolTest extends TestSupport { } connection.close(); } + + public void testInstanceOf() throws Exception { + XaPooledConnectionFactory pcf = new XaPooledConnectionFactory(); + assertTrue(pcf instanceof QueueConnectionFactory); + assertTrue(pcf instanceof TopicConnectionFactory); + } + + public void testBindable() throws Exception { + XaPooledConnectionFactory pcf = new XaPooledConnectionFactory(); + assertTrue(pcf instanceof ObjectFactory); + assertTrue(((ObjectFactory)pcf).getObjectInstance(null, null, null, null) instanceof XaPooledConnectionFactory); + assertTrue(pcf.isTmFromJndi()); + } + + public void testBindableEnvOverrides() throws Exception { + XaPooledConnectionFactory pcf = new XaPooledConnectionFactory(); + assertTrue(pcf instanceof ObjectFactory); + Hashtable environment = new Hashtable(); + environment.put("tmFromJndi", String.valueOf(Boolean.FALSE)); + assertTrue(((ObjectFactory) pcf).getObjectInstance(null, null, null, environment) instanceof XaPooledConnectionFactory); + assertFalse(pcf.isTmFromJndi()); + } + }