diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java index 659e9827e3..ac81a1f410 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -615,6 +617,101 @@ public class RedeliveryPolicyTest extends JmsTestSupport { } + public void testRedeliveryRollbackWithDelayBlocking() throws Exception + { + redeliveryRollbackWithDelay(true); + } + + public void testRedeliveryRollbackWithDelayNonBlocking() throws Exception + { + redeliveryRollbackWithDelay(false); + } + + public void redeliveryRollbackWithDelay(final boolean blockingRedelivery) throws Exception { + + connection.start(); + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = sendSession.createProducer(destination); + producer.send(sendSession.createTextMessage("1st")); + producer.send(sendSession.createTextMessage("2nd")); + + + connection = (ActiveMQConnection)factory.createConnection(userName, password); + connections.add(connection); + + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(2000); + policy.setUseExponentialBackOff(false); + connection.setNonBlockingRedelivery(blockingRedelivery); + connection.start(); + final CountDownLatch done = new CountDownLatch(3); + + final ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED); + final List list = new ArrayList<>(); + session.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + ActiveMQTextMessage m = (ActiveMQTextMessage) message; + LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId()); + list.add(((ActiveMQTextMessage) message).getText()); + if (done.getCount() == 3) + { + session.rollback(); + } + done.countDown(); + + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + }); + + connection.createConnectionConsumer( + destination, + null, + new ServerSessionPool() { + @Override + public ServerSession getServerSession() throws JMSException { + return new ServerSession() { + @Override + public Session getSession() throws JMSException { + return session; + } + + @Override + public void start() throws JMSException { + } + }; + } + }, + 100, + false); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + session.run(); + return done.await(10, TimeUnit.MILLISECONDS); + } + }, 5000); + + connection.close(); + connections.remove(connection); + + assertEquals(list.size(), 3); + if (blockingRedelivery) { + assertEquals("1st", list.get(0)); + assertEquals("2nd", list.get(1)); + assertEquals("1st", list.get(2)); + } else { + assertEquals("1st", list.get(0)); + assertEquals("1st", list.get(1)); + assertEquals("2nd", list.get(2)); + } + } + public void testInitialRedeliveryDelayZero() throws Exception { // Receive a message with the JMS API