From 0ebb0f88ef2afc590bf19ba1ec08ed995669a9dc Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 15 May 2020 15:39:56 +0100 Subject: [PATCH] [AMQ-7485] add check for rollbackonly flag in session send such that failed ended transactions prevent further work till next transaction boundary --- .../org/apache/activemq/ActiveMQSession.java | 3 + .../apache/activemq/TransactionContext.java | 4 + .../activemq/ra/LocalAndXATransaction.java | 5 + .../ra/ManagedTransactionContext.java | 6 +- .../ra/JmsXAQueueTransactionTest.java | 92 ++++++++++++++++++- .../ActiveMQXAConnectionFactoryTest.java | 34 +++++++ 6 files changed, 142 insertions(+), 2 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index bef6f4e21c..8afb442d08 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -1926,6 +1926,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta synchronized (sendMutex) { // tell the Broker we are about to start a new transaction doStartTransaction(); + if (transactionContext.isRollbackOnly()) { + throw new IllegalStateException("transaction marked rollback only"); + } TransactionId txid = transactionContext.getTransactionId(); long sequenceNumber = producer.getMessageSequence(); diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java index 01fff404d3..bb5735fdc0 100644 --- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java @@ -111,6 +111,10 @@ public class TransactionContext implements XAResource { rollbackOnly = val; } + public boolean isRollbackOnly() { + return rollbackOnly; + } + public boolean isInLocalTransaction() { return transactionId != null && transactionId.isLocalTransaction(); } diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java index 75d691b1b5..d49cfb369b 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java @@ -102,6 +102,11 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction { } catch (JMSException e) { throw (XAException)new XAException(XAException.XAER_PROTO).initCause(e); } + if ((arg1 & TMFAIL) != 0) { + // do no further work in this context + LOG.debug("Marking transaction: {} rollbackOnly", this); + transactionContext.setRollbackOnly(true); + } } } diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedTransactionContext.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedTransactionContext.java index 0f41f7969d..f0b25491c2 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedTransactionContext.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedTransactionContext.java @@ -113,9 +113,13 @@ public class ManagedTransactionContext extends TransactionContext { } } + public boolean isRollbackOnly() { + return sharedContext.isRollbackOnly() || super.isRollbackOnly(); + } + public boolean isInXATransaction() { if (useSharedTxContext) { - // context considers endesd XA transactions as active, so just check for presence + // context considers ended XA transactions as active, so just check for presence // of tx when it is shared return sharedContext.isInTransaction(); } else { diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java index f55325ea4c..4ea32935eb 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java @@ -20,8 +20,12 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; import javax.jms.Session; import javax.resource.spi.ManagedConnection; import javax.transaction.xa.XAResource; @@ -33,9 +37,11 @@ import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.JmsQueueTransactionTest; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest { - + private static final Logger LOG = LoggerFactory.getLogger(JmsXAQueueTransactionTest.class); private ConnectionManagerAdapter connectionManager = new ConnectionManagerAdapter(); private ActiveMQManagedConnectionFactory managedConnectionFactory; private XAResource xaResource; @@ -113,6 +119,12 @@ public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest { xid = null; } + protected void abortTx() throws Exception { + xaResource.end(xid, XAResource.TMFAIL); + xaResource.rollback(xid); + xid = null; + } + //This test won't work with xa tx it is overridden to do nothing here @Override public void testMessageListener() throws Exception { @@ -130,6 +142,84 @@ public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest { public void testSendSessionClose() throws Exception { } + public void testSendOnAbortedXATx() throws Exception { + connection.close(); + + ConnectionFactory connectionFactory = newConnectionFactory(); + connection = connectionFactory.createConnection(); + connection.start(); + + ManagedConnectionProxy proxy = (ManagedConnectionProxy) connection; + ManagedConnection mc = proxy.getManagedConnection(); + xaResource = mc.getXAResource(); + + beginTx(); + + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(destination); + + abortTx(); + + try { + producer.send(session.createTextMessage("my tx aborted!")); + fail("expect error on send with rolled back tx"); + } catch (JMSException expected) { + assertTrue("matches expected message", expected.getLocalizedMessage().contains("rollback only")); + expected.printStackTrace(); + } + + connection.close(); + } + + public void testReceiveTwoThenAbort() throws Exception { + Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")}; + + // lets consume any outstanding messages from prev test runs + beginTx(); + while (consumer.receive(1000) != null) { + } + commitTx(); + + // + beginTx(); + producer.send(outbound[0]); + producer.send(outbound[1]); + commitTx(); + + LOG.info("Sent 0: " + outbound[0]); + LOG.info("Sent 1: " + outbound[1]); + + ArrayList messages = new ArrayList(); + beginTx(); + Message message = consumer.receive(1000); + assertEquals(outbound[0], message); + + message = consumer.receive(1000); + assertNotNull(message); + assertEquals(outbound[1], message); + abortTx(); + + // Consume again.. the prev message should + // get redelivered. + beginTx(); + message = consumer.receive(5000); + assertNotNull("Should have re-received the first message again!", message); + messages.add(message); + assertEquals(outbound[0], message); + message = consumer.receive(5000); + assertNotNull("Should have re-received the second message again!", message); + messages.add(message); + assertEquals(outbound[1], message); + + assertNull(consumer.receiveNoWait()); + commitTx(); + + Message inbound[] = new Message[messages.size()]; + messages.toArray(inbound); + assertTextMessagesEqual("Rollback did not work", outbound, inbound); + } + public Xid createXid() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java index 3055181779..c48420a97b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java @@ -459,6 +459,40 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport { } catch (javax.jms.IllegalStateException expected) {} } + public void testProducerFailAfterRollbackOnly() throws Exception { + + ActiveMQConnectionFactory cf1 = getXAConnectionFactory("vm://localhost?broker.persistent=false"); + XAConnection connection1 = (XAConnection)cf1.createConnection(); + connection1.start(); + + XASession session = connection1.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(getName()); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + + // can happen out of band with XA via RAR + resource.end(tid, XAResource.TMFAIL); + ((ActiveMQSession)session).getTransactionContext().setRollbackOnly(true); + try { + producer.send(message); + fail("expect error on setRollbackOnly"); + } catch (JMSException expected) {} + + // rollback only state does not linger + tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + producer.send(message); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + connection1.close(); + } + public void testRollbackXaErrorCode() throws Exception { String brokerName = "rollbackErrorCode"; BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));