diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index aee663a3e8..baf5233d6b 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -820,7 +820,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC for (MessageDispatch old : list) { // ensure we don't filter this as a duplicate if (LOG.isDebugEnabled()) { - LOG.debug("on close, rollback: " + old.getMessage().getMessageId()); + LOG.debug("on close, rollback duplicate: " + old.getMessage().getMessageId()); } session.connection.rollbackDuplicate(this, old.getMessage()); } diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java index b16fcba882..7ff87028ff 100755 --- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java @@ -398,8 +398,9 @@ public class TransactionContext implements XAResource { beforeEnd(); } catch (JMSException e) { throw toXAException(e); + } finally { + setXid(null); } - setXid(null); } else if ((flags & TMSUCCESS) == TMSUCCESS) { // set to null if this is the current xid. // otherwise this could be an asynchronous success call @@ -408,8 +409,9 @@ public class TransactionContext implements XAResource { beforeEnd(); } catch (JMSException e) { throw toXAException(e); + } finally { + setXid(null); } - setXid(null); } } else { throw new XAException(XAException.XAER_INVAL); @@ -684,6 +686,7 @@ public class TransactionContext implements XAResource { this.connection.checkClosedOrFailed(); this.connection.ensureConnectionInfoSent(); } catch (JMSException e) { + disassociate(); throw toXAException(e); } @@ -699,6 +702,7 @@ public class TransactionContext implements XAResource { LOG.debug("Started XA transaction: " + transactionId); } } catch (JMSException e) { + disassociate(); throw toXAException(e); } @@ -712,6 +716,7 @@ public class TransactionContext implements XAResource { LOG.debug("Ended XA transaction: " + transactionId); } } catch (JMSException e) { + disassociate(); throw toXAException(e); } @@ -729,12 +734,16 @@ public class TransactionContext implements XAResource { } } - // dis-associate - associatedXid = null; - transactionId = null; + disassociate(); } } + private void disassociate() { + // dis-associate + associatedXid = null; + transactionId = null; + } + /** * Sends the given command. Also sends the command in case of interruption, * so that important commands like rollback and commit are never interrupted. diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java b/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java index d73dccdd33..e0699fedd3 100644 --- a/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java @@ -46,8 +46,9 @@ public class XaConnectionPool extends ConnectionPool { try { boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION); if (isXa) { - transacted = true; - ackMode = Session.SESSION_TRANSACTED; + // if the xa tx aborts inflight we don't want to auto create a local transaction or auto ack + transacted = false; + ackMode = Session.CLIENT_ACKNOWLEDGE; } else if (transactionManager != null) { // cmt or transactionManager managed transacted = false; diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java index aef297baf1..56c4f60358 100644 --- a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java +++ b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java @@ -151,6 +151,104 @@ public class XAConnectionPoolTest extends TestSupport { connection.close(); } + public void testAckModeOfPoolNonXAWithTM() throws Exception { + final Vector syncs = new Vector(); + ActiveMQTopic topic = new ActiveMQTopic("test"); + XaPooledConnectionFactory pcf = new XaPooledConnectionFactory(); + pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test?broker.persistent=false")); + + // simple TM that is in a tx and will track syncs + pcf.setTransactionManager(new TransactionManager(){ + @Override + public void begin() throws NotSupportedException, SystemException { + } + + @Override + public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException { + } + + @Override + public int getStatus() throws SystemException { + return Status.STATUS_ACTIVE; + } + + @Override + public Transaction getTransaction() throws SystemException { + return new Transaction() { + @Override + public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException { + } + + @Override + public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException { + return false; + } + + @Override + public boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException { + return false; + } + + @Override + public int getStatus() throws SystemException { + return 0; + } + + @Override + public void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException { + syncs.add(synch); + } + + @Override + public void rollback() throws IllegalStateException, SystemException { + } + + @Override + public void setRollbackOnly() throws IllegalStateException, SystemException { + } + }; + + } + + @Override + public void resume(Transaction tobj) throws IllegalStateException, InvalidTransactionException, SystemException { + } + + @Override + public void rollback() throws IllegalStateException, SecurityException, SystemException { + } + + @Override + public void setRollbackOnly() throws IllegalStateException, SystemException { + } + + @Override + public void setTransactionTimeout(int seconds) throws SystemException { + } + + @Override + public Transaction suspend() throws SystemException { + return null; + } + }); + + TopicConnection connection = (TopicConnection) pcf.createConnection(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + assertEquals("client ack is enforce", Session.CLIENT_ACKNOWLEDGE, session.getAcknowledgeMode()); + TopicPublisher publisher = session.createPublisher(topic); + publisher.publish(session.createMessage()); + + // simulate a commit + for (Synchronization sync : syncs) { + sync.beforeCompletion(); + } + for (Synchronization sync : syncs) { + sync.afterCompletion(1); + } + connection.close(); + } + public void testInstanceOf() throws Exception { XaPooledConnectionFactory pcf = new XaPooledConnectionFactory(); assertTrue(pcf instanceof QueueConnectionFactory);