This example demonstrates receiving a message within the scope of an XA transaction. When using an XA transaction the message will only be acknowledged and removed from the queue when the transaction is committed. If the transaction is not committed the message maybe redelivered after rollback or during XA recovery.
ActiveMQ is JTA aware, meaning you can use ActiveMQ in an XA transactional environment and participate in XA transactions. It provides the javax.transaction.xa.XAResource interface for that purpose. Users can get a XAConnectionFactory to create XAConnections and XASessions.
In this example we simulate a transaction manager to control the transactions. First we create an XASession for receiving and a normal session for sending. Then we start a new xa transaction and enlist the receiving XASession through its XAResource. We then send two words, 'hello' and 'world', receive them, and let the transaction roll back. The received messages are cancelled back to the queue. Next we start a new transaction with the same XAResource enlisted, but this time we commit the transaction after receiving the messages. Then we check that no more messages are to be received.
To run the example, simply type mvn verify -Pexample
from this directory
client-jndi.properties
file in the directory ../common/config
InitialContext initialContext = getContext(0);
Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
XAConnectionFactory cf = (XAConnectionFactory) initialContext.lookup("/XAConnectionFactory");
connection = cf.createXAConnection();
connection.start();
XASession xaSession = connection.createXASession();
Session normalSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer normalProducer = normalSession.createProducer(queue);
Session session = xaSession.getSession();
MessageConsumer xaConsumer = session.createConsumer(queue);
TextMessage helloMessage = session.createTextMessage("hello");
TextMessage worldMessage = session.createTextMessage("world");
Xid xid1 = new XidImpl("xa-example1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
XAResource xaRes = xaSession.getXAResource();
xaRes.start(xid1, XAResource.TMNOFLAGS);
normalProducer.send(helloMessage);
normalProducer.send(worldMessage);
TextMessage rm1 = (TextMessage)xaConsumer.receive();
System.out.println("Message received: " + rm1.getText());
TextMessage rm2 = (TextMessage)xaConsumer.receive();
System.out.println("Message received: " + rm2.getText());
xaRes.end(xid1, XAResource.TMSUCCESS);
xaRes.prepare(xid1);
xaRes.rollback(xid1);
Xid xid2 = new XidImpl("xa-example2".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
xaRes.start(xid2, XAResource.TMNOFLAGS);
rm1 = (TextMessage)xaConsumer.receive();
System.out.println("Message received again: " + rm1.getText());
rm2 = (TextMessage)xaConsumer.receive();
System.out.println("Message received again: " + rm2.getText());
xaRes.end(xid2, XAResource.TMSUCCESS);
xaRes.prepare(xid2);
xaRes.commit(xid2, false);
TextMessage rm3 = (TextMessage)xaConsumer.receive(2000);
if (rm3 == null)
{
System.out.println("No message received after commit.");
}
else
{
result = false;
}
finally
block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects
finally
{
if (initialContext != null)
{
initialContext.close();
}
if (connection != null)
{
connection.close();
}
}