mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-2950 - xa transaction rollback on connection close
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1002604 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d28be4dfb4
commit
c0daffa381
|
@ -251,6 +251,16 @@ public class TransactionBroker extends BrokerFilter {
|
||||||
}
|
}
|
||||||
iter.remove();
|
iter.remove();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (Transaction tx : xaTransactions.values()) {
|
||||||
|
try {
|
||||||
|
if (!tx.isPrepared()) {
|
||||||
|
tx.rollback();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
next.removeConnection(context, info, error);
|
next.removeConnection(context, info, error);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -301,6 +301,31 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCloseSendConnection() throws Exception {
|
||||||
|
String brokerName = "closeSend";
|
||||||
|
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
|
||||||
|
broker.start();
|
||||||
|
broker.waitUntilStarted();
|
||||||
|
ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
|
||||||
|
XAConnection connection = (XAConnection)cf.createConnection();
|
||||||
|
connection.start();
|
||||||
|
XASession session = connection.createXASession();
|
||||||
|
XAResource resource = session.getXAResource();
|
||||||
|
Destination dest = new ActiveMQQueue(getName());
|
||||||
|
|
||||||
|
// publish a message
|
||||||
|
Xid tid = createXid();
|
||||||
|
resource.start(tid, XAResource.TMNOFLAGS);
|
||||||
|
MessageProducer producer = session.createProducer(dest);
|
||||||
|
ActiveMQTextMessage message = new ActiveMQTextMessage();
|
||||||
|
message.setText(getName());
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
|
||||||
|
assertTransactionGoneFromBroker(tid);
|
||||||
|
}
|
||||||
|
|
||||||
private void assertTransactionGoneFromFailoverState(
|
private void assertTransactionGoneFromFailoverState(
|
||||||
ActiveMQXAConnection connection1, Xid tid) throws Exception {
|
ActiveMQXAConnection connection1, Xid tid) throws Exception {
|
||||||
|
|
||||||
|
@ -336,7 +361,7 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
|
||||||
TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class);
|
TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class);
|
||||||
try {
|
try {
|
||||||
transactionBroker.getTransaction(null, new XATransactionId(tid), false);
|
transactionBroker.getTransaction(null, new XATransactionId(tid), false);
|
||||||
fail("expecte ex on tx not found");
|
fail("expected exception on tx not found");
|
||||||
} catch (XAException expectedOnNotFound) {
|
} catch (XAException expectedOnNotFound) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue