AMQ-2078 extend transaction tests to xa in ra

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@735912 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
David Jencks 2009-01-20 02:06:37 +00:00
parent 2deea18cc2
commit 1f00a395f8
4 changed files with 301 additions and 48 deletions

View File

@ -56,18 +56,21 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
// lets consume any outstanding messages from previous test runs
beginTx();
while (consumer.receive(1000) != null) {
}
session.commit();
commitTx();
beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
session.commit();
commitTx();
LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]);
ArrayList<Message> messages = new ArrayList<Message>();
beginTx();
Message message = consumer.receive(1000);
assertEquals(outbound[0], message);
@ -80,6 +83,7 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
// Consume again.. the previous message should
// get redelivered.
beginTx();
message = consumer.receive(5000);
assertNotNull("Should have re-received the first message again!", message);
messages.add(message);
@ -89,7 +93,7 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
assertNotNull("Should have re-received the second message again!", message);
messages.add(message);
assertEquals(outbound[1], message);
session.commit();
commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
@ -111,24 +115,28 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
// Session that sends messages
{
Session session = resourceProvider.createSession(connection);
this.session = session;
MessageProducer producer = resourceProvider.createProducer(session, destination);
// consumer = resourceProvider.createConsumer(session,
// destination);
beginTx();
producer.send(session.createTextMessage("Test Message: " + i));
session.commit();
commitTx();
session.close();
}
// Session that consumes messages
{
Session session = resourceProvider.createSession(connection);
this.session = session;
MessageConsumer consumer = resourceProvider.createConsumer(session, destination);
beginTx();
TextMessage message = (TextMessage)consumer.receive(1000 * 5);
assertNotNull("Received only " + i + " messages in batch ", message);
assertEquals("Test Message: " + i, message.getText());
session.commit();
commitTx();
session.close();
}
}
@ -145,20 +153,24 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")};
// lets consume any outstanding messages from previous test runs
beginTx();
while (consumer.receive(1000) != null) {
}
session.commit();
commitTx();
beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
producer.send(outbound[2]);
session.commit();
commitTx();
// Get the first.
beginTx();
assertEquals(outbound[0], consumer.receive(1000));
consumer.close();
session.commit();
commitTx();
beginTx();
QueueBrowser browser = session.createBrowser((Queue)destination);
Enumeration enumeration = browser.getEnumeration();
@ -187,7 +199,7 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
assertEquals(outbound[2], consumer.receive(1000));
consumer.close();
session.commit();
commitTx();
}
}

View File

@ -85,11 +85,31 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
resourceProvider = getJmsResourceProvider();
topic = resourceProvider.isTopic();
// We will be using transacted sessions.
resourceProvider.setTransacted(true);
connectionFactory = resourceProvider.createConnectionFactory();
setSessionTransacted();
connectionFactory = newConnectionFactory();
reconnect();
}
protected void setSessionTransacted() {
resourceProvider.setTransacted(true);
}
protected ConnectionFactory newConnectionFactory() throws Exception {
return resourceProvider.createConnectionFactory();
}
protected void beginTx() throws Exception {
//no-op for local tx
}
protected void commitTx() throws Exception {
session.commit();
}
protected void rollbackTx() throws Exception {
session.rollback();
}
/**
*/
protected BrokerService createBroker() throws Exception, URISyntaxException {
@ -124,24 +144,25 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
public void testSendReceiveTransactedBatches() throws Exception {
TextMessage message = session.createTextMessage("Batch Message");
for (int j = 0; j < batchCount; j++) {
LOG.info("Producing bacth " + j + " of " + batchSize + " messages");
beginTx();
for (int i = 0; i < batchSize; i++) {
producer.send(message);
}
messageSent();
session.commit();
commitTx();
LOG.info("Consuming bacth " + j + " of " + batchSize + " messages");
beginTx();
for (int i = 0; i < batchSize; i++) {
message = (TextMessage)consumer.receive(1000 * 5);
assertNotNull("Received only " + i + " messages in batch " + j, message);
assertEquals("Batch Message", message.getText());
}
session.commit();
commitTx();
}
}
@ -158,18 +179,22 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
// sends a message
beginTx();
producer.send(outbound[0]);
session.commit();
commitTx();
// sends a message that gets rollbacked
beginTx();
producer.send(session.createTextMessage("I'm going to get rolled back."));
session.rollback();
rollbackTx();
// sends a message
beginTx();
producer.send(outbound[1]);
session.commit();
commitTx();
// receives the first message
beginTx();
ArrayList<Message> messages = new ArrayList<Message>();
LOG.info("About to consume message 1");
Message message = consumer.receive(1000);
@ -183,26 +208,58 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
LOG.info("Received: " + message);
// validates that the rollbacked was not consumed
session.commit();
commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
}
/**
* spec section 3.6 acking a message with automation acks has no effect.
* @throws Exception
*/
public void testAckMessageInTx() throws Exception {
Message[] outbound = new Message[] {session.createTextMessage("First Message")};
// sends a message
beginTx();
producer.send(outbound[0]);
outbound[0].acknowledge();
commitTx();
outbound[0].acknowledge();
// receives the first message
beginTx();
ArrayList<Message> messages = new ArrayList<Message>();
LOG.info("About to consume message 1");
Message message = consumer.receive(1000);
messages.add(message);
LOG.info("Received: " + message);
// validates that the rollbacked was not consumed
commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
assertTextMessagesEqual("Message not delivered.", outbound, inbound);
}
/**
* Sends a batch of messages and validates that the message sent before
* session close is not consumed.
*
*
* This test only works with local transactions, not xa.
* @throws Exception
*/
public void testSendSessionClose() throws Exception {
Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
// sends a message
beginTx();
producer.send(outbound[0]);
session.commit();
commitTx();
// sends a message that gets rollbacked
beginTx();
producer.send(session.createTextMessage("I'm going to get rolled back."));
consumer.close();
@ -210,11 +267,12 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
// sends a message
producer.send(outbound[1]);
session.commit();
commitTx();
// receives the first message
ArrayList<Message> messages = new ArrayList<Message>();
LOG.info("About to consume message 1");
beginTx();
Message message = consumer.receive(1000);
messages.add(message);
LOG.info("Received: " + message);
@ -226,7 +284,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
LOG.info("Received: " + message);
// validates that the rollbacked was not consumed
session.commit();
commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
@ -242,10 +300,12 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
// sends a message
beginTx();
producer.send(outbound[0]);
session.commit();
commitTx();
// sends a message that gets rollbacked
beginTx();
producer.send(session.createTextMessage("I'm going to get rolled back."));
consumer.close();
session.close();
@ -253,12 +313,14 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
reconnect();
// sends a message
beginTx();
producer.send(outbound[1]);
session.commit();
commitTx();
// receives the first message
ArrayList<Message> messages = new ArrayList<Message>();
LOG.info("About to consume message 1");
beginTx();
Message message = consumer.receive(1000);
messages.add(message);
LOG.info("Received: " + message);
@ -270,7 +332,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
LOG.info("Received: " + message);
// validates that the rollbacked was not consumed
session.commit();
commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
@ -286,36 +348,41 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
// lets consume any outstanding messages from prev test runs
beginTx();
while (consumer.receive(1000) != null) {
}
session.commit();
commitTx();
// sent both messages
beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
session.commit();
commitTx();
LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]);
ArrayList<Message> messages = new ArrayList<Message>();
beginTx();
Message message = consumer.receive(1000);
messages.add(message);
assertEquals(outbound[0], message);
session.commit();
commitTx();
// rollback so we can get that last message again.
beginTx();
message = consumer.receive(1000);
assertNotNull(message);
assertEquals(outbound[1], message);
session.rollback();
rollbackTx();
// Consume again.. the prev message should
// get redelivered.
beginTx();
message = consumer.receive(5000);
assertNotNull("Should have re-received the message again!", message);
messages.add(message);
session.commit();
commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
@ -332,29 +399,33 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
// lets consume any outstanding messages from prev test runs
beginTx();
while (consumer.receive(1000) != null) {
}
session.commit();
commitTx();
//
beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
session.commit();
commitTx();
LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]);
ArrayList<Message> messages = new ArrayList<Message>();
beginTx();
Message message = consumer.receive(1000);
assertEquals(outbound[0], message);
message = consumer.receive(1000);
assertNotNull(message);
assertEquals(outbound[1], message);
session.rollback();
rollbackTx();
// Consume again.. the prev message should
// get redelivered.
beginTx();
message = consumer.receive(5000);
assertNotNull("Should have re-received the first message again!", message);
messages.add(message);
@ -365,7 +436,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
assertEquals(outbound[1], message);
assertNull(consumer.receiveNoWait());
session.commit();
commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
@ -383,13 +454,15 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message"),
session.createTextMessage("Fourth Message")};
beginTx();
for (int i = 0; i < outbound.length; i++) {
// sends a message
producer.send(outbound[i]);
}
session.commit();
commitTx();
// receives the first message
beginTx();
for (int i = 0; i < outbound.length; i++) {
LOG.info("About to consume message 1");
Message message = consumer.receive(1000);
@ -398,7 +471,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
}
// validates that the rollbacked was not consumed
session.commit();
commitTx();
}
/**
@ -446,33 +519,37 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
// lets consume any outstanding messages from prev test runs
beginTx();
while (consumer.receiveNoWait() != null) {
}
session.commit();
commitTx();
// sends the messages
beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
session.commit();
commitTx();
LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]);
beginTx();
TextMessage message = (TextMessage)consumer.receive(1000);
assertEquals(outbound[0].getText(), message.getText());
// Close the consumer before the commit. This should not cause the
// received message
// to rollback.
consumer.close();
session.commit();
commitTx();
// Create a new consumer
consumer = resourceProvider.createConsumer(session, destination);
LOG.info("Created consumer: " + consumer);
beginTx();
message = (TextMessage)consumer.receive(1000);
assertEquals(outbound[1].getText(), message.getText());
session.commit();
commitTx();
}
public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
@ -481,10 +558,12 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
Message outbound = session.createObjectMessage(list);
outbound.setStringProperty("foo", "abc");
beginTx();
producer.send(outbound);
session.commit();
commitTx();
LOG.info("About to consume message 1");
beginTx();
Message message = consumer.receive(5000);
List<String> body = assertReceivedObjectMessageWithListBody(message);
@ -498,12 +577,13 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
}
body.clear();
body.add("This should never be seen!");
session.rollback();
rollbackTx();
beginTx();
message = consumer.receive(5000);
List<String> secondBody = assertReceivedObjectMessageWithListBody(message);
assertNotSame("Second call should return a different body", secondBody, body);
session.commit();
commitTx();
}
@SuppressWarnings("unchecked")
@ -526,7 +606,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
*
* @throws JMSException
*/
protected void reconnect() throws JMSException {
protected void reconnect() throws Exception {
if (connection != null) {
// Close the prev connection.
@ -558,19 +638,24 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
* Sets the prefeftch policy to one.
*/
protected void setPrefetchToOne() {
ActiveMQPrefetchPolicy prefetchPolicy = ((ActiveMQConnection)connection).getPrefetchPolicy();
ActiveMQPrefetchPolicy prefetchPolicy = getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(1);
prefetchPolicy.setTopicPrefetch(1);
prefetchPolicy.setDurableTopicPrefetch(1);
prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
}
protected ActiveMQPrefetchPolicy getPrefetchPolicy() {
return ((ActiveMQConnection)connection).getPrefetchPolicy();
}
//This test won't work with xa tx so no beginTx() has been added.
public void testMessageListener() throws Exception {
// send messages
for (int i = 0; i < MESSAGE_COUNT; i++) {
producer.send(session.createTextMessage(MESSAGE_TEXT + i));
}
session.commit();
commitTx();
consumer.setMessageListener(this);
// wait receive
waitReceiveUnack();
@ -589,7 +674,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
unackMessages.add(message);
if (unackMessages.size() == MESSAGE_COUNT) {
try {
session.rollback();
rollbackTx();
resendPhase = true;
} catch (Exception e) {
e.printStackTrace();
@ -599,7 +684,7 @@ public abstract class JmsTransactionTestSupport extends TestSupport implements M
ackMessages.add(message);
if (ackMessages.size() == MESSAGE_COUNT) {
try {
session.commit();
commitTx();
} catch (Exception e) {
e.printStackTrace();
}

View File

@ -197,6 +197,9 @@ public class JmsResourceProvider {
*/
public void setTransacted(boolean transacted) {
this.transacted = transacted;
if (transacted) {
setAckMode(Session.SESSION_TRANSACTED);
}
}
/**

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.activemq.ra;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import javax.resource.spi.ManagedConnection;
import javax.resource.ResourceException;
import org.apache.activemq.*;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @version $Rev:$ $Date:$
*/
public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest {
private static final String DEFAULT_HOST = "vm://localhost";
private ConnectionManagerAdapter connectionManager = new ConnectionManagerAdapter();
private ActiveMQManagedConnectionFactory managedConnectionFactory;
private XAResource xaResource;
private static long txGenerator;
private Xid xid;
@Override
protected void setSessionTransacted() {
resourceProvider.setTransacted(false);
resourceProvider.setAckMode(Session.AUTO_ACKNOWLEDGE);
}
@Override
protected ConnectionFactory newConnectionFactory() throws Exception {
managedConnectionFactory = new ActiveMQManagedConnectionFactory();
managedConnectionFactory.setServerUrl(DEFAULT_HOST);
managedConnectionFactory.setUserName(org.apache.activemq.ActiveMQConnectionFactory.DEFAULT_USER);
managedConnectionFactory.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD);
return (ConnectionFactory)managedConnectionFactory.createConnectionFactory(connectionManager);
}
/**
* Recreates the connection.
*
* @throws javax.jms.JMSException
*/
@Override
protected void reconnect() throws Exception {
super.reconnect();
ManagedConnectionProxy proxy = (ManagedConnectionProxy) connection;
ManagedConnection mc = proxy.getManagedConnection();
xaResource = mc.getXAResource();
}
@Override
protected ActiveMQPrefetchPolicy getPrefetchPolicy() {
ManagedConnectionProxy proxy = (ManagedConnectionProxy) connection;
ActiveMQManagedConnection mc = proxy.getManagedConnection();
ActiveMQConnection conn = (ActiveMQConnection) mc.getPhysicalConnection();
return conn.getPrefetchPolicy();
}
@Override
protected void beginTx() throws Exception {
xid = createXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
}
@Override
protected void commitTx() throws Exception {
xaResource.end(xid, XAResource.TMSUCCESS);
int result = xaResource.prepare(xid);
if (result == XAResource.XA_OK) {
xaResource.commit(xid, false);
}
xid = null;
}
@Override
protected void rollbackTx() throws Exception {
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.rollback(xid);
xid = null;
}
//This test won't work with xa tx it is overridden to do nothing here
@Override
public void testMessageListener() throws Exception {
}
/**
* Sends a batch of messages and validates that the message sent before
* session close is not consumed.
* <p/>
* This test only works with local transactions, not xa. so its commented out here
*
* @throws Exception
*/
@Override
public void testSendSessionClose() throws Exception {
}
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
os.writeLong(++txGenerator);
os.close();
final byte[] bs = baos.toByteArray();
return new Xid() {
public int getFormatId() {
return 86;
}
public byte[] getGlobalTransactionId() {
return bs;
}
public byte[] getBranchQualifier() {
return bs;
}
};
}
}