From 94ca7039b1a18d6a7b4f4626b410d0bf826c4275 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 22 Jun 2016 13:02:14 -0400 Subject: [PATCH] Add a rollback test that rolls back repeatedly using new connections. --- .../amqp/JMSClientTransactionTest.java | 60 ++++++++++++++++++- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java index e979714444..1251410c2b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; @@ -107,7 +108,7 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { TextMessage message = (TextMessage) messageConsumer.receive(1000); if (message != null) { counter++; - LOG.info("Message n. {} with content '{}' has been recieved.", counter,message.getText()); + LOG.info("Message n. {} with content '{}' has been recieved.", counter, message.getText()); session.commit(); LOG.info("Transaction has been committed."); } @@ -144,7 +145,7 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { TextMessage message = (TextMessage) messageConsumer.receive(1000); if (message != null) { counter++; - LOG.info("Message n. {} with content '{}' has been recieved.", counter,message.getText()); + LOG.info("Message n. {} with content '{}' has been recieved.", counter, message.getText()); } } while (counter < MSG_COUNT); @@ -199,7 +200,7 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { LOG.info("Trying to receive message: {}", i); TextMessage message = (TextMessage) consumer.receive(1000); assertNotNull("Message " + i + " should be available", message); - assertEquals("Should get message: " + i, i , message.getIntProperty("MessageSequence")); + assertEquals("Should get message: " + i, i, message.getIntProperty("MessageSequence")); } session.commit(); @@ -295,4 +296,57 @@ public class JMSClientTransactionTest extends JMSClientTestSupport { } })); } + + @Test + public void testMessageOrderAfterRollback() throws Exception { + sendMessages(5); + + int counter = 0; + while (counter++ < 10) { + connection = createConnection(); + connection.start(); + + Session session = connection.createSession(true, -1); + Queue queue = session.createQueue(getDestinationName()); + MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(5000); + assertNotNull(message); + assertTrue(message instanceof TextMessage); + + int sequenceID = message.getIntProperty("sequenceID"); + assertEquals(0, sequenceID); + + LOG.info("Read message = {}", ((TextMessage) message).getText()); + session.rollback(); + session.close(); + connection.close(); + } + } + + public void sendMessages(int messageCount) throws JMSException { + Connection connection = null; + try { + connection = createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + + for (int i = 0; i < messageCount; ++i) { + MessageProducer messageProducer = session.createProducer(queue); + TextMessage message = session.createTextMessage("(" + i + ")"); + message.setIntProperty("sequenceID", i); + messageProducer.send(message); + LOG.info("Sent message = {}", message.getText()); + } + + } catch (Exception exp) { + exp.printStackTrace(System.out); + } finally { + if (connection != null) { + connection.close(); + } + } + } }