diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java b/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java index d48e0972ed..d73dccdd33 100644 --- a/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java @@ -48,6 +48,12 @@ public class XaConnectionPool extends ConnectionPool { if (isXa) { transacted = true; ackMode = Session.SESSION_TRANSACTED; + } else if (transactionManager != null) { + // cmt or transactionManager managed + transacted = false; + if (ackMode == Session.SESSION_TRANSACTED) { + ackMode = Session.AUTO_ACKNOWLEDGE; + } } PooledSession session = (PooledSession) super.createSession(transacted, ackMode); if (isXa) { diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java index 41b92ec946..760b26885c 100644 --- a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java +++ b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java @@ -191,4 +191,60 @@ public class XAConnectionPoolTest extends TestSupport { topicConnection.close(); } + public void testSessionArgsIgnoredWithTm() throws Exception { + XaPooledConnectionFactory pcf = new XaPooledConnectionFactory(); + pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test?broker.persistent=false")); + // simple TM that with no tx + pcf.setTransactionManager(new TransactionManager() { + @Override + public void begin() throws NotSupportedException, SystemException { + throw new SystemException("NoTx"); + } + + @Override + public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException { + throw new IllegalStateException("NoTx"); + } + + @Override + public int getStatus() throws SystemException { + return Status.STATUS_NO_TRANSACTION; + } + + @Override + public Transaction getTransaction() throws SystemException { + throw new SystemException("NoTx"); + } + + @Override + public void resume(Transaction tobj) throws IllegalStateException, InvalidTransactionException, SystemException { + throw new IllegalStateException("NoTx"); + } + + @Override + public void rollback() throws IllegalStateException, SecurityException, SystemException { + throw new IllegalStateException("NoTx"); + } + + @Override + public void setRollbackOnly() throws IllegalStateException, SystemException { + throw new IllegalStateException("NoTx"); + } + + @Override + public void setTransactionTimeout(int seconds) throws SystemException { + } + + @Override + public Transaction suspend() throws SystemException { + throw new SystemException("NoTx"); + } + }); + + QueueConnection connection = pcf.createQueueConnection(); + // like ee tck + assertNotNull("can create session(false, 0)", connection.createQueueSession(false, 0)); + + connection.close(); + } }