ARTEMIS-1242 OpenWire Transactions never removed
Openwire doesn't remove the finished transactions (committed or rolledback).
This commit is contained in:
parent
a8ecc9fa38
commit
e258bdf188
|
@ -70,6 +70,7 @@ import org.apache.activemq.artemis.core.server.ServerSession;
|
|||
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
|
||||
import org.apache.activemq.artemis.core.server.TempQueueObserver;
|
||||
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
|
||||
import org.apache.activemq.artemis.core.transaction.ResourceManager;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
|
||||
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
|
||||
|
@ -1101,13 +1102,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
@Override
|
||||
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
|
||||
Transaction tx = lookupTX(info.getTransactionId(), null);
|
||||
Transaction tx = lookupTX(info.getTransactionId(), null, true);
|
||||
AMQSession amqSession = (AMQSession) tx.getProtocolData();
|
||||
|
||||
if (info.getTransactionId().isXATransaction() && tx == null) {
|
||||
throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA);
|
||||
} else if (tx != null) {
|
||||
|
||||
AMQSession amqSession = (AMQSession) tx.getProtocolData();
|
||||
|
||||
if (amqSession != null) {
|
||||
amqSession.getCoreSession().resetTX(tx);
|
||||
|
||||
|
@ -1117,6 +1118,54 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
amqSession.getCoreSession().resetTX(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (info.getTransactionId().isXATransaction()) {
|
||||
ResourceManager resourceManager = server.getResourceManager();
|
||||
Xid xid = OpenWireUtil.toXID(info.getTransactionId());
|
||||
|
||||
if (tx == null) {
|
||||
if (resourceManager.getHeuristicCommittedTransactions().contains(xid)) {
|
||||
XAException ex = new XAException("transaction has been heuristically committed: " + xid);
|
||||
ex.errorCode = XAException.XA_HEURCOM;
|
||||
throw ex;
|
||||
} else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid)) {
|
||||
// checked heuristic rolled back transactions
|
||||
XAException ex = new XAException("transaction has been heuristically rolled back: " + xid);
|
||||
ex.errorCode = XAException.XA_HEURRB;
|
||||
throw ex;
|
||||
} else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("xarollback into " + tx + ", xid=" + xid + " forcing a rollback regular");
|
||||
}
|
||||
|
||||
try {
|
||||
if (amqSession != null) {
|
||||
amqSession.getCoreSession().rollback(false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
}
|
||||
|
||||
XAException ex = new XAException("Cannot find xid in resource manager: " + xid);
|
||||
ex.errorCode = XAException.XAER_NOTA;
|
||||
throw ex;
|
||||
}
|
||||
} else {
|
||||
if (tx.getState() == Transaction.State.SUSPENDED) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("xarollback into " + tx + " sending tx back as it was suspended");
|
||||
}
|
||||
// Put it back
|
||||
resourceManager.putTransaction(xid, tx);
|
||||
XAException ex = new XAException("Cannot commit transaction, it is suspended " + xid);
|
||||
ex.errorCode = XAException.XAER_PROTO;
|
||||
throw ex;
|
||||
} else {
|
||||
tx.rollback();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tx.rollback();
|
||||
}
|
||||
|
||||
|
@ -1229,11 +1278,47 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
private Response processCommit(TransactionInfo info, boolean onePhase) throws Exception {
|
||||
TransactionId txID = info.getTransactionId();
|
||||
|
||||
Transaction tx = lookupTX(txID, null);
|
||||
Transaction tx = lookupTX(txID, null, true);
|
||||
|
||||
AMQSession session = (AMQSession) tx.getProtocolData();
|
||||
if (txID.isXATransaction()) {
|
||||
ResourceManager resourceManager = server.getResourceManager();
|
||||
Xid xid = OpenWireUtil.toXID(txID);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("XAcommit into " + tx + ", xid=" + xid);
|
||||
}
|
||||
|
||||
if (tx == null) {
|
||||
if (resourceManager.getHeuristicCommittedTransactions().contains(xid)) {
|
||||
XAException ex = new XAException("transaction has been heuristically committed: " + xid);
|
||||
ex.errorCode = XAException.XA_HEURCOM;
|
||||
throw ex;
|
||||
} else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid)) {
|
||||
// checked heuristic rolled back transactions
|
||||
XAException ex = new XAException("transaction has been heuristically rolled back: " + xid);
|
||||
ex.errorCode = XAException.XA_HEURRB;
|
||||
throw ex;
|
||||
} else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("XAcommit into " + tx + ", xid=" + xid + " cannot find it");
|
||||
}
|
||||
XAException ex = new XAException("Cannot find xid in resource manager: " + xid);
|
||||
ex.errorCode = XAException.XAER_NOTA;
|
||||
throw ex;
|
||||
}
|
||||
} else {
|
||||
if (tx.getState() == Transaction.State.SUSPENDED) {
|
||||
// Put it back
|
||||
resourceManager.putTransaction(xid, tx);
|
||||
XAException ex = new XAException("Cannot commit transaction, it is suspended " + xid);
|
||||
ex.errorCode = XAException.XAER_PROTO;
|
||||
throw ex;
|
||||
} else {
|
||||
tx.commit(onePhase);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tx.commit(onePhase);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
@ -1485,6 +1570,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
}
|
||||
|
||||
private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException {
|
||||
return lookupTX(txID, session, false);
|
||||
}
|
||||
|
||||
private Transaction lookupTX(TransactionId txID, AMQSession session, boolean remove) throws IllegalStateException {
|
||||
if (txID == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -1493,9 +1582,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
Transaction transaction;
|
||||
if (txID.isXATransaction()) {
|
||||
xid = OpenWireUtil.toXID(txID);
|
||||
transaction = server.getResourceManager().getTransaction(xid);
|
||||
transaction = remove ? server.getResourceManager().removeTransaction(xid) : server.getResourceManager().getTransaction(xid);
|
||||
} else {
|
||||
transaction = txMap.get(txID);
|
||||
transaction = remove ? txMap.remove(txID) : txMap.get(txID);
|
||||
}
|
||||
|
||||
if (transaction == null) {
|
||||
|
|
|
@ -44,6 +44,7 @@ import javax.jms.TopicSession;
|
|||
import javax.jms.TopicSubscriber;
|
||||
import javax.jms.XAConnection;
|
||||
import javax.jms.XASession;
|
||||
import javax.transaction.xa.XAException;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.util.ArrayList;
|
||||
|
@ -63,6 +64,8 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
|||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
|
@ -1490,6 +1493,97 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXAResourceCommitSuspendedNotRemoved() throws Exception {
|
||||
Queue queue = null;
|
||||
|
||||
Xid xid = newXID();
|
||||
try (XAConnection xaconnection = xaFactory.createXAConnection()) {
|
||||
XASession session = xaconnection.createXASession();
|
||||
queue = session.createQueue(queueName);
|
||||
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
|
||||
session.getXAResource().end(xid, XAResource.TMSUSPEND);
|
||||
|
||||
XidImpl xid1 = new XidImpl(xid);
|
||||
Transaction transaction = server.getResourceManager().getTransaction(xid1);
|
||||
//amq5.x doesn't pass suspend flags to broker,
|
||||
//directly suspend the tx
|
||||
transaction.suspend();
|
||||
|
||||
session.getXAResource().commit(xid, true);
|
||||
} catch (XAException ex) {
|
||||
//ignore
|
||||
} finally {
|
||||
XidImpl xid1 = new XidImpl(xid);
|
||||
Transaction transaction = server.getResourceManager().getTransaction(xid1);
|
||||
assertNotNull(transaction);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXAResourceRolledBackSuspendedNotRemoved() throws Exception {
|
||||
Queue queue = null;
|
||||
|
||||
Xid xid = newXID();
|
||||
try (XAConnection xaconnection = xaFactory.createXAConnection()) {
|
||||
XASession session = xaconnection.createXASession();
|
||||
queue = session.createQueue(queueName);
|
||||
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
|
||||
session.getXAResource().end(xid, XAResource.TMSUSPEND);
|
||||
|
||||
XidImpl xid1 = new XidImpl(xid);
|
||||
Transaction transaction = server.getResourceManager().getTransaction(xid1);
|
||||
//directly suspend the tx
|
||||
transaction.suspend();
|
||||
|
||||
session.getXAResource().rollback(xid);
|
||||
} catch (XAException ex) {
|
||||
//ignore
|
||||
} finally {
|
||||
XidImpl xid1 = new XidImpl(xid);
|
||||
Transaction transaction = server.getResourceManager().getTransaction(xid1);
|
||||
assertNotNull(transaction);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXAResourceCommittedRemoved() throws Exception {
|
||||
Queue queue = null;
|
||||
|
||||
Xid xid = newXID();
|
||||
try (XAConnection xaconnection = xaFactory.createXAConnection()) {
|
||||
XASession session = xaconnection.createXASession();
|
||||
queue = session.createQueue(queueName);
|
||||
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.send(session.createTextMessage("xa message"));
|
||||
session.getXAResource().end(xid, XAResource.TMSUCCESS);
|
||||
session.getXAResource().commit(xid, true);
|
||||
}
|
||||
XidImpl xid1 = new XidImpl(xid);
|
||||
Transaction transaction = server.getResourceManager().getTransaction(xid1);
|
||||
assertNull(transaction);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXAResourceRolledBackRemoved() throws Exception {
|
||||
Queue queue = null;
|
||||
|
||||
Xid xid = newXID();
|
||||
try (XAConnection xaconnection = xaFactory.createXAConnection()) {
|
||||
XASession session = xaconnection.createXASession();
|
||||
queue = session.createQueue(queueName);
|
||||
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.send(session.createTextMessage("xa message"));
|
||||
session.getXAResource().end(xid, XAResource.TMSUCCESS);
|
||||
session.getXAResource().rollback(xid);
|
||||
}
|
||||
XidImpl xid1 = new XidImpl(xid);
|
||||
Transaction transaction = server.getResourceManager().getTransaction(xid1);
|
||||
assertNull(transaction);
|
||||
}
|
||||
|
||||
private void checkQueueEmpty(String qName) {
|
||||
PostOffice po = server.getPostOffice();
|
||||
LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));
|
||||
|
|
Loading…
Reference in New Issue