fix - AMQ-2034 - have close in XA transaction deferred to synchronisation after completion, have rollback call beforeEnd to propagate acknowledgements; add a bunch of tests

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@726764 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-12-15 18:48:51 +00:00
parent 7aec12eaca
commit 50a98e3b0e
8 changed files with 364 additions and 10 deletions

View File

@ -590,11 +590,27 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
*/
public void close() throws JMSException {
if (!unconsumedMessages.isClosed()) {
dispose();
this.session.asyncSendPacket(info.createRemoveCommand());
if (session.isTransacted() && session.getTransactionContext().getTransactionId() != null) {
session.getTransactionContext().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
doClose();
}
public void afterRollback() throws Exception {
doClose();
}
});
} else {
doClose();
}
}
}
void doClose() throws JMSException {
dispose();
this.session.asyncSendPacket(info.createRemoveCommand());
}
void clearMessagesInProgress() {
// we are called from inside the transport reconnection logic
// which involves us clearing all the connections' consumers
@ -653,10 +669,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// }
// Do we have any acks we need to send out before closing?
// Ack any delivered messages now. (session may still
// commit/rollback the acks).
// Ack any delivered messages now.
// only processes optimized acknowledgements
deliverAcks();
if (!session.isTransacted()) {
deliverAcks();
if (session.isDupsOkAcknowledge()) {
acknowledge();
}
}
if (executorService != null) {
executorService.shutdown();
try {
@ -665,9 +685,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
Thread.currentThread().interrupt();
}
}
if (session.isTransacted() || session.isDupsOkAcknowledge()) {
acknowledge();
}
if (session.isClientAcknowledge()) {
if (!this.info.isBrowser()) {
// rollback duplicates that aren't acknowledged

View File

@ -27,6 +27,7 @@ import javax.jms.XATopicSession;
import javax.transaction.xa.XAResource;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.transaction.Synchronization;
/**
* The XASession interface extends the capability of Session by adding access
@ -96,6 +97,24 @@ public class ActiveMQXASession extends ActiveMQSession implements QueueSession,
return new ActiveMQTopicSession(this);
}
@Override
public void close() throws JMSException {
if (getTransactionContext().isInXATransaction()) {
getTransactionContext().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
doClose();
}
public void afterRollback() throws Exception {
doClose();
}
});
}
}
void doClose() throws JMSException {
super.close();
}
/**
* This is called before transacted work is done by
* the session. XA Work can only be done when this

View File

@ -224,6 +224,7 @@ public class TransactionContext implements XAResource {
throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
}
beforeEnd();
if (transactionId != null) {
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
this.transactionId = null;

View File

@ -16,27 +16,39 @@
*/
package org.apache.activemq;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XAQueueConnection;
import javax.jms.XASession;
import javax.jms.XATopicConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.transport.stomp.StompTransportFilter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
private static final Log LOG = LogFactory.getLog(ActiveMQXAConnectionFactoryTest.class);
long txGenerator = System.currentTimeMillis();
public void testCopy() throws URISyntaxException, JMSException {
ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://localhost?");
@ -117,6 +129,126 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
connection2.close();
}
public void testVanilaTransactionalProduceReceive() throws Exception {
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection)cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
}
public void testConsumerCloseTransactionalSendReceive() throws Exception {
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection)cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
producer.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
consumer.close();
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
assertNull(consumer.receive(1000));
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
}
public void testSessionCloseTransactionalSendReceive() throws Exception {
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection)cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
session.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
session.close();
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
assertNull(consumer.receive(1000));
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
}
protected void assertCreateConnection(String uri) throws Exception {
// Start up a broker with a tcp connector.
BrokerService broker = new BrokerService();
@ -161,5 +293,29 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
assertTrue("Should be an XATopicConnection", connection instanceof XATopicConnection);
assertTrue("Should be an XAQueueConnection", connection instanceof XAQueueConnection);
}
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
os.writeLong(++txGenerator);
os.close();
final byte[] bs = baos.toByteArray();
return new Xid() {
public int getFormatId() {
return 86;
}
public byte[] getGlobalTransactionId() {
return bs;
}
public byte[] getBranchQualifier() {
return bs;
}
};
}
}

View File

@ -652,5 +652,35 @@ public class JMSConsumerTest extends JmsTestSupport {
assertNull(redispatchConsumer.receive(500));
redispatchSession.close();
}
public void testRedispatchOfRolledbackTx() throws Exception {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
sendMessages(connection, destination, 1);
MessageConsumer consumer = session.createConsumer(destination);
assertNotNull(consumer.receive(1000));
// install another consumer while message dispatch is unacked/uncommitted
Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
session.rollback();
session.close();
Message msg = redispatchConsumer.receive(1000);
assertNotNull(msg);
assertTrue(msg.getJMSRedelivered());
// should have re-delivery of 2, one for re-dispatch, one for rollback which is a little too much!
assertEquals(3, msg.getLongProperty("JMSXDeliveryCount"));
redispatchSession.commit();
assertNull(redispatchConsumer.receive(500));
redispatchSession.close();
}
}

View File

@ -157,7 +157,8 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
// Get the first.
assertEquals(outbound[0], consumer.receive(1000));
consumer.close();
session.commit();
QueueBrowser browser = session.createBrowser((Queue)destination);
Enumeration enumeration = browser.getEnumeration();

View File

@ -37,7 +37,7 @@ public class CloseRollbackRedeliveryQueueTest extends EmbeddedBrokerTestSupport
protected int numberOfMessagesOnQueue = 1;
private Connection connection;
public void testVerifyCloseRedeliveryWithFailoverTransport() throws Throwable {
public void testVerifySessionCloseRedeliveryWithFailoverTransport() throws Throwable {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(destination);
@ -57,7 +57,46 @@ public class CloseRollbackRedeliveryQueueTest extends EmbeddedBrokerTestSupport
assertEquals("redelivered message", id, message.getJMSMessageID());
assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
}
public void testVerifyConsumerAndSessionCloseRedeliveryWithFailoverTransport() throws Throwable {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive(1000);
String id = message.getJMSMessageID();
assertNotNull(message);
LOG.info("got message " + message);
consumer.close();
session.close();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumer = session.createConsumer(destination);
message = consumer.receive(1000);
session.commit();
assertNotNull(message);
assertEquals("redelivered message", id, message.getJMSMessageID());
assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
}
public void testVerifyConsumerCloseSessionRollbackRedeliveryWithFailoverTransport() throws Throwable {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive(1000);
String id = message.getJMSMessageID();
assertNotNull(message);
LOG.info("got message " + message);
consumer.close();
session.rollback();
consumer = session.createConsumer(destination);
message = consumer.receive(1000);
session.commit();
assertNotNull(message);
assertEquals("redelivered message", id, message.getJMSMessageID());
assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
}
protected void setUp() throws Exception {
super.setUp();

View File

@ -25,7 +25,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@ -188,6 +190,94 @@ public class MDBTest extends TestCase {
}
public void testMessageExceptionReDelivery() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
adapter.setServerUrl("vm://localhost?broker.persistent=false");
adapter.start(new StubBootstrapContext());
final CountDownLatch messageDelivered = new CountDownLatch(2);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
public void onMessage(Message message) {
super.onMessage(message);
try {
messageDelivered.countDown();
if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
throw new RuntimeException(getName() + " ex on first delivery");
} else {
try {
assertTrue(message.getJMSRedelivered());
} catch (JMSException e) {
e.printStackTrace();
}
}
} catch (InterruptedException ignored) {
}
};
public void afterDelivery() throws ResourceException {
try {
if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
xaresource.end(xid, XAResource.TMFAIL);
xaresource.rollback(xid);
} else {
xaresource.end(xid, XAResource.TMSUCCESS);
xaresource.prepare(xid);
xaresource.commit(xid, false);
}
} catch (Throwable e) {
throw new ResourceException(e);
}
}
};
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
activationSpec.setDestinationType(Queue.class.getName());
activationSpec.setDestination("TEST");
activationSpec.setResourceAdapter(adapter);
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
};
// Activate an Endpoint
adapter.endpointActivation(messageEndpointFactory, activationSpec);
// Give endpoint a chance to setup and register its listeners
try {
Thread.sleep(1000);
} catch (Exception e) {
}
// Send the broker a message to that endpoint
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
producer.send(session.createTextMessage("Hello!"));
connection.close();
// Wait for the message to be delivered twice.
assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
// Shut the Endpoint down.
adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
adapter.stop();
}
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);