diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java index be8428cd42..e61d0ba264 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java @@ -251,6 +251,16 @@ public class TransactionBroker extends BrokerFilter { } iter.remove(); } + + for (Transaction tx : xaTransactions.values()) { + try { + if (!tx.isPrepared()) { + tx.rollback(); + } + } catch (Exception e) { + LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e); + } + } next.removeConnection(context, info, error); } diff --git a/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java b/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java index d1a425d298..469a0315e3 100644 --- a/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java @@ -301,6 +301,31 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport { } + public void testCloseSendConnection() throws Exception { + String brokerName = "closeSend"; + BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName)); + broker.start(); + broker.waitUntilStarted(); + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()); + XAConnection connection = (XAConnection)cf.createConnection(); + connection.start(); + XASession session = connection.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(getName()); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + producer.send(message); + + connection.close(); + + assertTransactionGoneFromBroker(tid); + } + private void assertTransactionGoneFromFailoverState( ActiveMQXAConnection connection1, Xid tid) throws Exception { @@ -336,7 +361,7 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport { TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class); try { transactionBroker.getTransaction(null, new XATransactionId(tid), false); - fail("expecte ex on tx not found"); + fail("expected exception on tx not found"); } catch (XAException expectedOnNotFound) { } }