[AMQ-7485] add check for rollbackonly flag in session send such that failed ended transactions prevent further work till next transaction boundary

This commit is contained in:
gtully 2020-05-15 15:39:56 +01:00
parent 41bef94293
commit 0ebb0f88ef
6 changed files with 142 additions and 2 deletions

View File

@ -1926,6 +1926,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
synchronized (sendMutex) {
// tell the Broker we are about to start a new transaction
doStartTransaction();
if (transactionContext.isRollbackOnly()) {
throw new IllegalStateException("transaction marked rollback only");
}
TransactionId txid = transactionContext.getTransactionId();
long sequenceNumber = producer.getMessageSequence();

View File

@ -111,6 +111,10 @@ public class TransactionContext implements XAResource {
rollbackOnly = val;
}
public boolean isRollbackOnly() {
return rollbackOnly;
}
public boolean isInLocalTransaction() {
return transactionId != null && transactionId.isLocalTransaction();
}

View File

@ -102,6 +102,11 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction {
} catch (JMSException e) {
throw (XAException)new XAException(XAException.XAER_PROTO).initCause(e);
}
if ((arg1 & TMFAIL) != 0) {
// do no further work in this context
LOG.debug("Marking transaction: {} rollbackOnly", this);
transactionContext.setRollbackOnly(true);
}
}
}

View File

@ -113,9 +113,13 @@ public class ManagedTransactionContext extends TransactionContext {
}
}
public boolean isRollbackOnly() {
return sharedContext.isRollbackOnly() || super.isRollbackOnly();
}
public boolean isInXATransaction() {
if (useSharedTxContext) {
// context considers endesd XA transactions as active, so just check for presence
// context considers ended XA transactions as active, so just check for presence
// of tx when it is shared
return sharedContext.isInTransaction();
} else {

View File

@ -20,8 +20,12 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.resource.spi.ManagedConnection;
import javax.transaction.xa.XAResource;
@ -33,9 +37,11 @@ import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsQueueTransactionTest;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest {
private static final Logger LOG = LoggerFactory.getLogger(JmsXAQueueTransactionTest.class);
private ConnectionManagerAdapter connectionManager = new ConnectionManagerAdapter();
private ActiveMQManagedConnectionFactory managedConnectionFactory;
private XAResource xaResource;
@ -113,6 +119,12 @@ public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest {
xid = null;
}
protected void abortTx() throws Exception {
xaResource.end(xid, XAResource.TMFAIL);
xaResource.rollback(xid);
xid = null;
}
//This test won't work with xa tx it is overridden to do nothing here
@Override
public void testMessageListener() throws Exception {
@ -130,6 +142,84 @@ public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest {
public void testSendSessionClose() throws Exception {
}
public void testSendOnAbortedXATx() throws Exception {
connection.close();
ConnectionFactory connectionFactory = newConnectionFactory();
connection = connectionFactory.createConnection();
connection.start();
ManagedConnectionProxy proxy = (ManagedConnectionProxy) connection;
ManagedConnection mc = proxy.getManagedConnection();
xaResource = mc.getXAResource();
beginTx();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
abortTx();
try {
producer.send(session.createTextMessage("my tx aborted!"));
fail("expect error on send with rolled back tx");
} catch (JMSException expected) {
assertTrue("matches expected message", expected.getLocalizedMessage().contains("rollback only"));
expected.printStackTrace();
}
connection.close();
}
public void testReceiveTwoThenAbort() throws Exception {
Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
// lets consume any outstanding messages from prev test runs
beginTx();
while (consumer.receive(1000) != null) {
}
commitTx();
//
beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
commitTx();
LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]);
ArrayList<Message> messages = new ArrayList<Message>();
beginTx();
Message message = consumer.receive(1000);
assertEquals(outbound[0], message);
message = consumer.receive(1000);
assertNotNull(message);
assertEquals(outbound[1], message);
abortTx();
// Consume again.. the prev message should
// get redelivered.
beginTx();
message = consumer.receive(5000);
assertNotNull("Should have re-received the first message again!", message);
messages.add(message);
assertEquals(outbound[0], message);
message = consumer.receive(5000);
assertNotNull("Should have re-received the second message again!", message);
messages.add(message);
assertEquals(outbound[1], message);
assertNull(consumer.receiveNoWait());
commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
assertTextMessagesEqual("Rollback did not work", outbound, inbound);
}
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();

View File

@ -459,6 +459,40 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
} catch (javax.jms.IllegalStateException expected) {}
}
public void testProducerFailAfterRollbackOnly() throws Exception {
ActiveMQConnectionFactory cf1 = getXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection)cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
// can happen out of band with XA via RAR
resource.end(tid, XAResource.TMFAIL);
((ActiveMQSession)session).getTransactionContext().setRollbackOnly(true);
try {
producer.send(message);
fail("expect error on setRollbackOnly");
} catch (JMSException expected) {}
// rollback only state does not linger
tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
producer.send(message);
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
connection1.close();
}
public void testRollbackXaErrorCode() throws Exception {
String brokerName = "rollbackErrorCode";
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));