From 0b6bf7ec1ba9df68ae6492408d482c984849c31c Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Mon, 31 Oct 2011 18:54:19 +0000 Subject: [PATCH] Fix for: https://issues.apache.org/jira/browse/AMQ-1853 Tests included. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1195615 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 11 +- .../activemq/ActiveMQConnectionFactory.java | 18 +- .../activemq/ActiveMQMessageConsumer.java | 66 ++- .../org/apache/activemq/bugs/AMQ1853Test.java | 351 ++++++++++++++++ .../NonBlockingConsumerRedeliveryTest.java | 381 ++++++++++++++++++ 5 files changed, 804 insertions(+), 23 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 7418235489..9797cb21b9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -191,8 +191,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon protected volatile CountDownLatch transportInterruptionProcessingComplete; private long consumerFailoverRedeliveryWaitPeriod; private final Scheduler scheduler; - private boolean messagePrioritySupported=true; + private boolean messagePrioritySupported = true; private boolean transactedIndividualAck = false; + private boolean nonBlockingRedelivery = false; /** * Construct an ActiveMQConnection @@ -2417,6 +2418,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.transactedIndividualAck = transactedIndividualAck; } + public boolean isNonBlockingRedelivery() { + return nonBlockingRedelivery; + } + + public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { + this.nonBlockingRedelivery = nonBlockingRedelivery; + } + /** * Removes any TempDestinations that this connection has cached, ignoring * any exceptions generated because the destination is in use as they should diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 8cc9fd7c25..1b9afcddd9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -120,6 +120,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private ClientInternalExceptionListener clientInternalExceptionListener; private boolean messagePrioritySupported = true; private boolean transactedIndividualAck = false; + private boolean nonBlockingRedelivery = false; // ///////////////////////////////////////////// // @@ -327,6 +328,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne connection.setCheckForDuplicates(isCheckForDuplicates()); connection.setMessagePrioritySupported(isMessagePrioritySupported()); connection.setTransactedIndividualAck(isTransactedIndividualAck()); + connection.setNonBlockingRedelivery(isNonBlockingRedelivery()); if (transportListener != null) { connection.addTransportListener(transportListener); } @@ -731,7 +733,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates())); props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported())); props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck())); - + props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery())); } public boolean isUseCompression() { @@ -1058,4 +1060,18 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne this.transactedIndividualAck = transactedIndividualAck; } + + public boolean isNonBlockingRedelivery() { + return nonBlockingRedelivery; + } + + /** + * When true a MessageConsumer will not stop Message delivery before re-delivering Messages + * from a rolled back transaction. This implies that message order will not be preserved and + * also will result in the TransactedIndividualAck option to be enabled. + */ + public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { + this.nonBlockingRedelivery = nonBlockingRedelivery; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 0f74b86b3e..a695ea1b87 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -153,6 +153,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private long optimizeAcknowledgeTimeOut = 0; private long failoverRedeliveryWaitPeriod = 0; private boolean transactedIndividualAck = false; + private boolean nonBlockingRedelivery = false; /** * Create a MessageConsumer @@ -260,7 +261,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod(); - this.transactedIndividualAck = session.connection.isTransactedIndividualAck(); + this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery(); + this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery; if (messageListener != null) { setMessageListener(messageListener); } @@ -579,7 +581,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC checkMessageListener(); if (timeout == 0) { return this.receive(); - } sendPullCommand(timeout); @@ -1184,30 +1185,52 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } // stop the delivery of messages. - unconsumedMessages.stop(); + if (nonBlockingRedelivery) { + if (!unconsumedMessages.isClosed()) { - for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { - MessageDispatch md = iter.next(); - unconsumedMessages.enqueueFirst(md); - } + final LinkedList pendingRedeliveries = + new LinkedList(deliveredMessages); - if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) { - // Start up the delivery again a little later. - scheduler.executeAfterDelay(new Runnable() { - public void run() { - try { - if (started.get()) { - start(); + // Start up the delivery again a little later. + scheduler.executeAfterDelay(new Runnable() { + public void run() { + try { + if (!unconsumedMessages.isClosed()) { + for(MessageDispatch dispatch : pendingRedeliveries) { + session.dispatch(dispatch); + } + } + } catch (Exception e) { + session.connection.onAsyncException(e); } - } catch (JMSException e) { - session.connection.onAsyncException(e); } - } - }, redeliveryDelay); - } else { - start(); - } + }, redeliveryDelay); + } + } else { + unconsumedMessages.stop(); + + for (MessageDispatch md : deliveredMessages) { + unconsumedMessages.enqueueFirst(md); + } + + if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) { + // Start up the delivery again a little later. + scheduler.executeAfterDelay(new Runnable() { + public void run() { + try { + if (started.get()) { + start(); + } + } catch (JMSException e) { + session.connection.onAsyncException(e); + } + } + }, redeliveryDelay); + } else { + start(); + } + } } deliveredCounter -= deliveredMessages.size(); deliveredMessages.clear(); @@ -1248,6 +1271,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } } + /* * called with deliveredMessages locked */ diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java new file mode 100644 index 0000000000..6432e4dd88 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.*; + +import java.net.URI; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageProducer; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test validates that the AMQ consumer blocks on redelivery of a message, + * through all redeliveries, until the message is either successfully consumed + * or sent to the DLQ. + */ +public class AMQ1853Test { + private static BrokerService broker; + + private static final Logger LOG = LoggerFactory.getLogger(AMQ1853Test.class); + static final String jmsConnectionURI = "failover:(vm://localhost)"; + + // Virtual Topic that the test publishes 10 messages to + private static final String queueFail = "Queue.BlockingConsumer.QueueFail"; + + // Number of messages + + private final int producerMessages = 5; + private final int totalNumberMessages = producerMessages * 2; + private final int maxRedeliveries = 2; + private final int redeliveryDelay = 1000; + + private Map messageList = null; + + @Before + public void setUp() throws Exception { + broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false")); + broker.setUseJmx(false); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + } + + @Test + public void testConsumerMessagesAreNotOrdered() throws Exception { + + TestConsumer consumerAllFail = null; + messageList = new Hashtable(); + + try { + + // The first 2 consumers will rollback, ultimately causing messages to land on the DLQ + + TestProducer producerAllFail = new TestProducer(queueFail); + thread(producerAllFail, false); + + consumerAllFail = new TestConsumer(queueFail, true); + thread(consumerAllFail, false); + + // Give the consumers a second to start + Thread.sleep(1000); + + thread(producerAllFail, false); + + // Give the consumers a second to start + Thread.sleep(1000); + + producerAllFail.getLatch().await(); + + LOG.info("producer successful, count = " + producerAllFail.getLatch().getCount()); + + assertTrue("message list size = " + messageList.size(), totalNumberMessages == messageList.size()); + LOG.info("final message list size = " + messageList.size()); + + consumerAllFail.getLatch().await(); + + LOG.info("consumerAllFail successful, count = " + consumerAllFail.getLatch().getCount()); + + Iterator keys = messageList.keySet().iterator(); + for (AtomicInteger counter : messageList.values()) { + String message = keys.next(); + assertTrue("for message " + message + " counter = " + counter.get(), counter.get() == maxRedeliveries + 1); + LOG.info("final count for message " + message + " counter = " + counter.get()); + } + + assertFalse(consumerAllFail.messageReceiptIsOrdered()); + } finally { + if (consumerAllFail != null) { + consumerAllFail.setStop(true); + } + } + } + + private static Thread thread(Runnable runnable, boolean daemon) { + Thread brokerThread = new Thread(runnable); + brokerThread.setDaemon(daemon); + brokerThread.start(); + return brokerThread; + } + + private class TestProducer implements Runnable { + + private CountDownLatch latch = null; + private String destinationName = null; + + public TestProducer(String destinationName) { + this.destinationName = destinationName; + // We run the producer 2 times + latch = new CountDownLatch(totalNumberMessages); + } + + public CountDownLatch getLatch() { + return latch; + } + + public void run() { + + ActiveMQConnectionFactory connectionFactory = null; + ActiveMQConnection connection = null; + ActiveMQSession session = null; + Destination destination = null; + + try { + LOG.info("Started TestProducer for destination (" + destinationName + ")"); + + connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.setCopyMessageOnSend(false); + connection.start(); + session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + destination = session.createQueue(this.destinationName); + + // Create a MessageProducer from the Session to the Topic or Queue + ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + for (int i = 0; i < (producerMessages); i++) { + TextMessage message = (TextMessage) session.createTextMessage(); + message.setLongProperty("TestTime", (System.currentTimeMillis())); + try { + producer.send(message); + LOG.info("Producer (" + destinationName + ")\n" + message.getJMSMessageID() + " = sent messageId\n"); + + latch.countDown(); + LOG.info(" Latch count " + latch.getCount()); + LOG.info("Producer message list size = " + messageList.keySet().size()); + messageList.put(message.getJMSMessageID(), new AtomicInteger(0)); + LOG.info("Producer message list size = " + messageList.keySet().size()); + + } catch (Exception deeperException) { + LOG.info("Producer for destination (" + destinationName + ") Caught: " + deeperException); + } + + Thread.sleep(1000); + } + + LOG.info("Finished TestProducer for destination (" + destinationName + ")"); + + } catch (Exception e) { + LOG.error("Terminating TestProducer(" + destinationName + ")Caught: " + e); + } finally { + try { + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } catch (Exception e) { + LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e); + } + } + } + } + + private class TestConsumer implements Runnable, ExceptionListener, MessageListener { + + private CountDownLatch latch = null; + private int receivedMessageCounter = 0; + private boolean bFakeFail = false; + String destinationName = null; + boolean bMessageReceiptIsOrdered = true; + boolean bStop = false; + String previousMessageId = null; + + private ActiveMQConnectionFactory connectionFactory = null; + private ActiveMQConnection connection = null; + private Session session = null; + private MessageConsumer consumer = null; + + public TestConsumer(String destinationName, boolean bFakeFail) { + this.bFakeFail = bFakeFail; + latch = new CountDownLatch(totalNumberMessages * (this.bFakeFail ? (maxRedeliveries + 1) : 1)); + this.destinationName = destinationName; + } + + public CountDownLatch getLatch() { + return latch; + } + + public boolean messageReceiptIsOrdered() { + return bMessageReceiptIsOrdered; + } + + public void run() { + + try { + LOG.info("Started TestConsumer for destination (" + destinationName + ")"); + + connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.setNonBlockingRedelivery(true); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(redeliveryDelay); + policy.setBackOffMultiplier(-1); + policy.setRedeliveryDelay(redeliveryDelay); + policy.setMaximumRedeliveryDelay(-1); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(maxRedeliveries); + + connection.setExceptionListener(this); + Destination destination = session.createQueue(destinationName); + consumer = session.createConsumer(destination); + consumer.setMessageListener(this); + + connection.start(); + + while (!bStop) { + Thread.sleep(100); + } + + LOG.info("Finished TestConsumer for destination name (" + destinationName + ") remaining " + this.latch.getCount() + + " messages " + this.toString()); + + } catch (Exception e) { + LOG.error("Consumer (" + destinationName + ") Caught: " + e); + } finally { + try { + if (consumer != null) { + consumer.close(); + } + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } catch (Exception e) { + LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e); + } + } + } + + public synchronized void onException(JMSException ex) { + LOG.error("Consumer for destination, (" + destinationName + "), JMS Exception occured. Shutting down client."); + } + + public synchronized void setStop(boolean bStop) { + this.bStop = bStop; + } + + public synchronized void onMessage(Message message) { + receivedMessageCounter++; + latch.countDown(); + + LOG.info("Consumer for destination (" + destinationName + ") latch countdown: " + latch.getCount() + + " :: Number messages received " + this.receivedMessageCounter); + + try { + + if (receivedMessageCounter % (maxRedeliveries + 1) == 1) { + previousMessageId = message.getJMSMessageID(); + } + + if (bMessageReceiptIsOrdered) { + bMessageReceiptIsOrdered = previousMessageId.trim().equals(message.getJMSMessageID()); + } + + AtomicInteger counter = messageList.get(message.getJMSMessageID()); + counter.incrementAndGet(); + + LOG.info("Consumer for destination (" + destinationName + ")\n" + message.getJMSMessageID() + " = currentMessageId\n" + + previousMessageId + " = previousMessageId\n" + bMessageReceiptIsOrdered + "= bMessageReceiptIsOrdered\n" + + ">>LATENCY " + (System.currentTimeMillis() - message.getLongProperty("TestTime")) + "\n" + "message counter = " + + counter.get()); + + if (!bFakeFail) { + LOG.debug("Consumer on destination " + destinationName + " committing JMS Session for message: " + message.toString()); + session.commit(); + } else { + LOG.debug("Consumer on destination " + destinationName + " rolling back JMS Session for message: " + message.toString()); + session.rollback(); // rolls back all the consumed messages on the session to + } + + } catch (JMSException ex) { + ex.printStackTrace(); + LOG.error("Error reading JMS Message from destination " + destinationName + "."); + } + } + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java new file mode 100644 index 0000000000..534799948b --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java @@ -0,0 +1,381 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import static org.junit.Assert.*; + +import java.util.LinkedHashSet; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NonBlockingConsumerRedeliveryTest { + private static final Logger LOG = LoggerFactory.getLogger(NonBlockingConsumerRedeliveryTest.class); + + private final String destinationName = "Destination"; + private final int MSG_COUNT = 100; + + private BrokerService broker; + private String connectionUri; + + private ActiveMQConnectionFactory connectionFactory; + + @Test + public void testMessageDeleiveredWhenNonBlockingEnabled() throws Exception { + + final LinkedHashSet received = new LinkedHashSet(); + final LinkedHashSet beforeRollback = new LinkedHashSet(); + final LinkedHashSet afterRollback = new LinkedHashSet(); + + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(destination); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + received.add(message); + } + }); + + sendMessages(); + + session.commit(); + connection.start(); + + assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", + Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.info("Consumer has received " + received.size() + " messages."); + return received.size() == MSG_COUNT; + } + } + )); + + beforeRollback.addAll(received); + received.clear(); + session.rollback(); + + assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.", + Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.info("Consumer has received " + received.size() + " messages since rollback."); + return received.size() == MSG_COUNT; + } + } + )); + + afterRollback.addAll(received); + received.clear(); + + assertEquals(beforeRollback.size(), afterRollback.size()); + assertEquals(beforeRollback, afterRollback); + session.commit(); + } + + @Test + public void testMessageDeleiveryDoesntStop() throws Exception { + + final LinkedHashSet received = new LinkedHashSet(); + final LinkedHashSet beforeRollback = new LinkedHashSet(); + final LinkedHashSet afterRollback = new LinkedHashSet(); + + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(destination); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + received.add(message); + } + }); + + sendMessages(); + connection.start(); + + assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", + Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.info("Consumer has received " + received.size() + " messages."); + return received.size() == MSG_COUNT; + } + } + )); + + beforeRollback.addAll(received); + received.clear(); + session.rollback(); + + sendMessages(); + + assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.", + Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.info("Consumer has received " + received.size() + " messages since rollback."); + return received.size() == MSG_COUNT * 2; + } + } + )); + + afterRollback.addAll(received); + received.clear(); + + assertEquals(beforeRollback.size() * 2, afterRollback.size()); + + session.commit(); + } + + @Test + public void testNonBlockingMessageDeleiveryIsDelayed() throws Exception { + final LinkedHashSet received = new LinkedHashSet(); + + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.getRedeliveryPolicy().setInitialRedeliveryDelay(TimeUnit.SECONDS.toMillis(6)); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(destination); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + received.add(message); + } + }); + + sendMessages(); + connection.start(); + + assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", + Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.info("Consumer has received " + received.size() + " messages."); + return received.size() == MSG_COUNT; + } + } + )); + + received.clear(); + session.rollback(); + + assertFalse("Delayed redelivery test not expecting any messages yet.", + Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + return received.size() > 0; + } + }, TimeUnit.SECONDS.toMillis(4) + )); + + session.commit(); + session.close(); + } + + @Test + public void testNonBlockingMessageDeleiveryWithRollbacks() throws Exception { + final LinkedHashSet received = new LinkedHashSet(); + + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + final Destination destination = session.createQueue(destinationName); + final MessageConsumer consumer = session.createConsumer(destination); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + received.add(message); + } + }); + + sendMessages(); + connection.start(); + + assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", + Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.info("Consumer has received " + received.size() + " messages."); + return received.size() == MSG_COUNT; + } + } + )); + + received.clear(); + + consumer.setMessageListener(new MessageListener() { + + int count = 0; + + @Override + public void onMessage(Message message) { + + if (++count > 10) { + try { + session.rollback(); + LOG.info("Rolling back session."); + count = 0; + } catch (JMSException e) { + LOG.warn("Caught an unexcepted exception: " + e.getMessage()); + } + } else { + received.add(message); + try { + session.commit(); + } catch (JMSException e) { + LOG.warn("Caught an unexcepted exception: " + e.getMessage()); + } + } + } + }); + + session.rollback(); + + assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.", + Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.info("Consumer has received " + received.size() + " messages since rollback."); + return received.size() == MSG_COUNT; + } + } + )); + + assertEquals(MSG_COUNT, received.size()); + session.commit(); + } + + @Test + public void testNonBlockingMessageDeleiveryWithAllRolledBack() throws Exception { + final LinkedHashSet received = new LinkedHashSet(); + final LinkedHashSet dlqed = new LinkedHashSet(); + + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.getRedeliveryPolicy().setMaximumRedeliveries(5); + final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + final Destination destination = session.createQueue(destinationName); + final Destination dlq = session.createQueue("ActiveMQ.DLQ"); + final MessageConsumer consumer = session.createConsumer(destination); + final MessageConsumer dlqConsumer = session.createConsumer(dlq); + + dlqConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + dlqed.add(message); + } + }); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + received.add(message); + } + }); + + sendMessages(); + connection.start(); + + assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", + Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.info("Consumer has received " + received.size() + " messages."); + return received.size() == MSG_COUNT; + } + } + )); + + session.rollback(); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + session.rollback(); + } catch (JMSException e) { + LOG.warn("Caught an unexcepted exception: " + e.getMessage()); + } + } + }); + + assertTrue("Post-Rollback expects to DLQ: " + MSG_COUNT + " messages.", + Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + LOG.info("Consumer has received " + dlqed.size() + " messages in DLQ."); + return dlqed.size() == MSG_COUNT; + } + } + )); + + session.commit(); + } + + private void sendMessages() throws Exception { + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageProducer producer = session.createProducer(destination); + for(int i = 0; i < MSG_COUNT; ++i) { + producer.send(session.createTextMessage("" + i)); + } + } + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://0.0.0.0:0"); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + connectionFactory.setNonBlockingRedelivery(true); + + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(TimeUnit.SECONDS.toMillis(2)); + policy.setBackOffMultiplier(-1); + policy.setRedeliveryDelay(TimeUnit.SECONDS.toMillis(2)); + policy.setMaximumRedeliveryDelay(-1); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(-1); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + +}