This example shows you how to make an XA heuristic decision through the ActiveMQ Management Interface.
A heuristic decision is a unilateral decision to commit or rollback an XA transaction branch after it has been prepared.
In this example we simulate a transaction manager to control the transactions. First we create an XASession and enlist it in a transaction through its XAResource. We then send a text message, 'hello' and end/prepare the transaction on the XAResource, but neither commit nor roll back the transaction. Another transaction is created and associated with the same XAResource, and a second message, 'world' is sent on behalf of the second transaction. Again we leave the second transaction in prepare state. Then we get the MBeanServerConnection object to manipulate the prepared transactions. To illustrate, we roll back the first transaction but commit the second. This will result in that only the message 'world' is received.
This example uses JMX to manipulate transactions in a ActiveMQ Server. For details on JMX facilities with ActiveMQ, please look at the JMX Example.
To run the example, simply type mvn verify -Pexample
from this directory
client-jndi.properties
file in the directory ../common/config
InitialContext initialContext = getContext(0);
Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
XAConnectionFactory cf = (XAConnectionFactory) initialContext.lookup("/XAConnectionFactory");
connection = cf.createXAConnection();
connection.start();
XASession xaSession = connection.createXASession();
Session normalSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer normalConsumer = normalSession.createConsumer(queue);
normalConsumer.setMessageListener(new SimpleMessageListener());
Session session = xaSession.getSession();
MessageProducer producer = session.createProducer(queue);
TextMessage helloMessage = session.createTextMessage("hello");
TextMessage worldMessage = session.createTextMessage("world");
Xid xid1 = new XidImpl("xa-example1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
XAResource xaRes = xaSession.getXAResource();
xaRes.start(xid1, XAResource.TMNOFLAGS);
normalProducer.send(helloMessage);
xaRes.end(xid1, XAResource.TMSUCCESS);
xaRes.prepare(xid1);
checkNoMessageReceived();
Xid xid2 = new XidImpl("xa-example2".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
xaRes.start(xid2, XAResource.TMNOFLAGS);
producer.send(worldMessage);
xaRes.end(xid2, XAResource.TMSUCCESS);
xaRes.prepare(xid2);
checkNoMessageReceived();
JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(JMX_URL), new HashMap());
MBeanServerConnection mbsc = connector.getMBeanServerConnection();
ObjectName serverObject = ObjectNameBuilder.DEFAULT.getMessagingServerObjectName();
String[] infos = (String[])mbsc.invoke(serverObject, "listPreparedTransactions", null, null);
System.out.println("Prepared transactions: ");
for (String i : infos)
{
System.out.println(i);
}
mbsc.invoke(serverObject, "rollbackPreparedTransaction", new String[] {XidImpl.toBase64String(xid1)}, new String[]{"java.lang.String"});
mbsc.invoke(serverObject, "commitPreparedTransaction", new String[] {XidImpl.toBase64String(xid2)}, new String[]{"java.lang.String"});
checkMessageReceived("world");
infos = (String[])mbsc.invoke(serverObject, "listPreparedTransactions", null, null);
System.out.println("No. of prepared transactions now: " + infos.length);
connector.close();
finally
block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects
finally
{
if (initialContext != null)
{
initialContext.close();
}
if (connection != null)
{
connection.close();
}
}