diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java b/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java index fe94b1e02e..729fb11acc 100644 --- a/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; @@ -37,39 +38,30 @@ import org.apache.commons.logging.LogFactory; public class JmsRollbackRedeliveryTest extends TestCase { protected static final Log LOG = LogFactory.getLog(JmsRollbackRedeliveryTest.class); - private boolean consumerClose = true; - - public void testRedeliveryNoConsumerClose() throws Exception { - consumerClose = false; - testRedelivery(); + final int nbMessages = 10; + final String destinationName = "Destination"; + boolean consumerClose = true; + boolean rollback = true; + + public void setUp() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); } + public void testRedelivery() throws Exception { final int nbMessages = 10; final String destinationName = "Destination"; - BrokerService broker = new BrokerService(); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.start(); - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=100"); Connection connection = connectionFactory.createConnection(); connection.start(); - // Enqueue nbMessages messages - { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue(destinationName); - MessageProducer producer = session.createProducer(destination); - for (int i = 1; i <= nbMessages; i++) { - producer.send(session.createTextMessage("")); - } - producer.close(); - session.close(); - } + populateDestination(nbMessages, destinationName, connection); // Consume messages and rollback transactions { @@ -83,17 +75,126 @@ public class JmsRollbackRedeliveryTest extends TestCase { if (msg != null) { if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertTrue(msg.getJMSRedelivered()); session.commit(); } else { LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID()); session.rollback(); } } - if (consumerClose ) { - consumer.close(); + consumer.close(); + session.close(); + } + } + } + + + public void testRedeliveryOnSingleConsumer() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestination(nbMessages, destinationName, connection); + + // Consume messages and rollback transactions + { + AtomicInteger received = new AtomicInteger(); + Map rolledback = new ConcurrentHashMap(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(destination); + while (received.get() < nbMessages) { + TextMessage msg = (TextMessage) consumer.receive(6000000); + if (msg != null) { + if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertTrue(msg.getJMSRedelivered()); + session.commit(); + } else { + LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID()); + session.rollback(); + } + } + } + consumer.close(); + session.close(); + } + } + + public void testRedeliveryOnSingleSession() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=100"); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestination(nbMessages, destinationName, connection); + + // Consume messages and rollback transactions + { + AtomicInteger received = new AtomicInteger(); + Map rolledback = new ConcurrentHashMap(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + while (received.get() < nbMessages) { + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(6000000); + if (msg != null) { + if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertTrue(msg.getJMSRedelivered()); + session.commit(); + } else { + LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID()); + session.rollback(); + } + } + consumer.close(); + } + session.close(); + } + } + + public void testRedeliveryOnSessionCloseWithNoRollback() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=100"); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestination(nbMessages, destinationName, connection); + + { + AtomicInteger received = new AtomicInteger(); + Map rolledback = new ConcurrentHashMap(); + while (received.get() < nbMessages) { + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(1000); + if (msg != null) { + if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertTrue(msg.getJMSRedelivered()); + session.commit(); + } } session.close(); } } } + + private void populateDestination(final int nbMessages, + final String destinationName, Connection connection) + throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageProducer producer = session.createProducer(destination); + for (int i = 1; i <= nbMessages; i++) { + producer.send(session.createTextMessage("")); + } + producer.close(); + session.close(); + } + }