From e258bdf188ebb1ec98d6b816202f4a9503d5fb28 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Wed, 21 Jun 2017 23:05:41 +0800 Subject: [PATCH] ARTEMIS-1242 OpenWire Transactions never removed Openwire doesn't remove the finished transactions (committed or rolledback). --- .../protocol/openwire/OpenWireConnection.java | 105 ++++++++++++++++-- .../openwire/SimpleOpenWireTest.java | 94 ++++++++++++++++ 2 files changed, 191 insertions(+), 8 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index a56901e638..06113b9736 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -70,6 +70,7 @@ import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; import org.apache.activemq.artemis.core.server.TempQueueObserver; import org.apache.activemq.artemis.core.server.impl.RefsOperation; +import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; @@ -1101,13 +1102,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processRollbackTransaction(TransactionInfo info) throws Exception { - Transaction tx = lookupTX(info.getTransactionId(), null); + Transaction tx = lookupTX(info.getTransactionId(), null, true); + AMQSession amqSession = (AMQSession) tx.getProtocolData(); + if (info.getTransactionId().isXATransaction() && tx == null) { throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA); } else if (tx != null) { - AMQSession amqSession = (AMQSession) tx.getProtocolData(); - if (amqSession != null) { amqSession.getCoreSession().resetTX(tx); @@ -1117,6 +1118,54 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se amqSession.getCoreSession().resetTX(null); } } + } + + if (info.getTransactionId().isXATransaction()) { + ResourceManager resourceManager = server.getResourceManager(); + Xid xid = OpenWireUtil.toXID(info.getTransactionId()); + + if (tx == null) { + if (resourceManager.getHeuristicCommittedTransactions().contains(xid)) { + XAException ex = new XAException("transaction has been heuristically committed: " + xid); + ex.errorCode = XAException.XA_HEURCOM; + throw ex; + } else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid)) { + // checked heuristic rolled back transactions + XAException ex = new XAException("transaction has been heuristically rolled back: " + xid); + ex.errorCode = XAException.XA_HEURRB; + throw ex; + } else { + if (logger.isTraceEnabled()) { + logger.trace("xarollback into " + tx + ", xid=" + xid + " forcing a rollback regular"); + } + + try { + if (amqSession != null) { + amqSession.getCoreSession().rollback(false); + } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } + + XAException ex = new XAException("Cannot find xid in resource manager: " + xid); + ex.errorCode = XAException.XAER_NOTA; + throw ex; + } + } else { + if (tx.getState() == Transaction.State.SUSPENDED) { + if (logger.isTraceEnabled()) { + logger.trace("xarollback into " + tx + " sending tx back as it was suspended"); + } + // Put it back + resourceManager.putTransaction(xid, tx); + XAException ex = new XAException("Cannot commit transaction, it is suspended " + xid); + ex.errorCode = XAException.XAER_PROTO; + throw ex; + } else { + tx.rollback(); + } + } + } else { tx.rollback(); } @@ -1229,11 +1278,47 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private Response processCommit(TransactionInfo info, boolean onePhase) throws Exception { TransactionId txID = info.getTransactionId(); - Transaction tx = lookupTX(txID, null); + Transaction tx = lookupTX(txID, null, true); - AMQSession session = (AMQSession) tx.getProtocolData(); + if (txID.isXATransaction()) { + ResourceManager resourceManager = server.getResourceManager(); + Xid xid = OpenWireUtil.toXID(txID); + if (logger.isTraceEnabled()) { + logger.trace("XAcommit into " + tx + ", xid=" + xid); + } - tx.commit(onePhase); + if (tx == null) { + if (resourceManager.getHeuristicCommittedTransactions().contains(xid)) { + XAException ex = new XAException("transaction has been heuristically committed: " + xid); + ex.errorCode = XAException.XA_HEURCOM; + throw ex; + } else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid)) { + // checked heuristic rolled back transactions + XAException ex = new XAException("transaction has been heuristically rolled back: " + xid); + ex.errorCode = XAException.XA_HEURRB; + throw ex; + } else { + if (logger.isTraceEnabled()) { + logger.trace("XAcommit into " + tx + ", xid=" + xid + " cannot find it"); + } + XAException ex = new XAException("Cannot find xid in resource manager: " + xid); + ex.errorCode = XAException.XAER_NOTA; + throw ex; + } + } else { + if (tx.getState() == Transaction.State.SUSPENDED) { + // Put it back + resourceManager.putTransaction(xid, tx); + XAException ex = new XAException("Cannot commit transaction, it is suspended " + xid); + ex.errorCode = XAException.XAER_PROTO; + throw ex; + } else { + tx.commit(onePhase); + } + } + } else { + tx.commit(onePhase); + } return null; } @@ -1485,6 +1570,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException { + return lookupTX(txID, session, false); + } + + private Transaction lookupTX(TransactionId txID, AMQSession session, boolean remove) throws IllegalStateException { if (txID == null) { return null; } @@ -1493,9 +1582,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se Transaction transaction; if (txID.isXATransaction()) { xid = OpenWireUtil.toXID(txID); - transaction = server.getResourceManager().getTransaction(xid); + transaction = remove ? server.getResourceManager().removeTransaction(xid) : server.getResourceManager().getTransaction(xid); } else { - transaction = txMap.get(txID); + transaction = remove ? txMap.remove(txID) : txMap.get(txID); } if (transaction == null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index a41acc63f8..fcd85ef87c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -44,6 +44,7 @@ import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.jms.XAConnection; import javax.jms.XASession; +import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.util.ArrayList; @@ -63,6 +64,8 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -1490,6 +1493,97 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } } + @Test + public void testXAResourceCommitSuspendedNotRemoved() throws Exception { + Queue queue = null; + + Xid xid = newXID(); + try (XAConnection xaconnection = xaFactory.createXAConnection()) { + XASession session = xaconnection.createXASession(); + queue = session.createQueue(queueName); + session.getXAResource().start(xid, XAResource.TMNOFLAGS); + session.getXAResource().end(xid, XAResource.TMSUSPEND); + + XidImpl xid1 = new XidImpl(xid); + Transaction transaction = server.getResourceManager().getTransaction(xid1); + //amq5.x doesn't pass suspend flags to broker, + //directly suspend the tx + transaction.suspend(); + + session.getXAResource().commit(xid, true); + } catch (XAException ex) { + //ignore + } finally { + XidImpl xid1 = new XidImpl(xid); + Transaction transaction = server.getResourceManager().getTransaction(xid1); + assertNotNull(transaction); + } + } + + @Test + public void testXAResourceRolledBackSuspendedNotRemoved() throws Exception { + Queue queue = null; + + Xid xid = newXID(); + try (XAConnection xaconnection = xaFactory.createXAConnection()) { + XASession session = xaconnection.createXASession(); + queue = session.createQueue(queueName); + session.getXAResource().start(xid, XAResource.TMNOFLAGS); + session.getXAResource().end(xid, XAResource.TMSUSPEND); + + XidImpl xid1 = new XidImpl(xid); + Transaction transaction = server.getResourceManager().getTransaction(xid1); + //directly suspend the tx + transaction.suspend(); + + session.getXAResource().rollback(xid); + } catch (XAException ex) { + //ignore + } finally { + XidImpl xid1 = new XidImpl(xid); + Transaction transaction = server.getResourceManager().getTransaction(xid1); + assertNotNull(transaction); + } + } + + @Test + public void testXAResourceCommittedRemoved() throws Exception { + Queue queue = null; + + Xid xid = newXID(); + try (XAConnection xaconnection = xaFactory.createXAConnection()) { + XASession session = xaconnection.createXASession(); + queue = session.createQueue(queueName); + session.getXAResource().start(xid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("xa message")); + session.getXAResource().end(xid, XAResource.TMSUCCESS); + session.getXAResource().commit(xid, true); + } + XidImpl xid1 = new XidImpl(xid); + Transaction transaction = server.getResourceManager().getTransaction(xid1); + assertNull(transaction); + } + + @Test + public void testXAResourceRolledBackRemoved() throws Exception { + Queue queue = null; + + Xid xid = newXID(); + try (XAConnection xaconnection = xaFactory.createXAConnection()) { + XASession session = xaconnection.createXASession(); + queue = session.createQueue(queueName); + session.getXAResource().start(xid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("xa message")); + session.getXAResource().end(xid, XAResource.TMSUCCESS); + session.getXAResource().rollback(xid); + } + XidImpl xid1 = new XidImpl(xid); + Transaction transaction = server.getResourceManager().getTransaction(xid1); + assertNull(transaction); + } + private void checkQueueEmpty(String qName) { PostOffice po = server.getPostOffice(); LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));