From 81f3935cf33feeca9e3380bc01f132e2fc45e57b Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 26 Nov 2010 14:45:17 +0000 Subject: [PATCH] resolve: https://issues.apache.org/activemq/browse/AMQ-3056 - do not throw on redelivery ack for a topic git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1039392 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/TopicSubscription.java | 3 ++ .../activemq/usecases/TopicRedeliverTest.java | 51 +++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index b1286883d9..5a876f8660 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -282,6 +282,9 @@ public class TopicSubscription extends AbstractSubscription { dequeueCounter.addAndGet(ack.getMessageCount()); dispatchMatched(); return; + } else if (ack.isRedeliveredAck()) { + // nothing to do atm + return; } throw new JMSException("Invalid acknowledgment: " + ack); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java index 637ce2c1e9..cc80907448 100755 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java @@ -16,9 +16,12 @@ */ package org.apache.activemq.usecases; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -28,12 +31,15 @@ import javax.jms.Topic; import org.apache.activemq.test.TestSupport; import org.apache.activemq.util.IdGenerator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * @version $Revision: 1.1.1.1 $ */ public class TopicRedeliverTest extends TestSupport { + private static final Log LOG = LogFactory.getLog(TopicRedeliverTest.class); private static final int RECEIVE_TIMEOUT = 10000; protected int deliveryMode = DeliveryMode.PERSISTENT; @@ -141,6 +147,51 @@ public class TopicRedeliverTest extends TestSupport { connection.close(); } + public void testNoExceptionOnRedeliveryAckWithSimpleTopicConsumer() throws Exception { + Destination destination = createDestination(getClass().getName()); + Connection connection = createConnection(); + final AtomicBoolean gotException = new AtomicBoolean(); + connection.setExceptionListener(new ExceptionListener() { + public void onException(JMSException exception) { + LOG.error("unexpected ex:" + exception); + gotException.set(true); + } + }); + connection.setClientID(idGen.generateId()); + connection.start(); + Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = null; + if (topic) { + consumer = consumerSession.createConsumer((Topic)destination); + } else { + consumer = consumerSession.createConsumer(destination); + } + Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + + TextMessage sentMsg = producerSession.createTextMessage(); + sentMsg.setText("msg1"); + producer.send(sentMsg); + producerSession.commit(); + + Message recMsg = consumer.receive(RECEIVE_TIMEOUT); + assertFalse(recMsg.getJMSRedelivered()); + recMsg = consumer.receive(RECEIVE_TIMEOUT); + consumerSession.rollback(); + recMsg = consumer.receive(RECEIVE_TIMEOUT); + assertTrue(recMsg.getJMSRedelivered()); + consumerSession.rollback(); + recMsg = consumer.receive(RECEIVE_TIMEOUT); + assertTrue(recMsg.getJMSRedelivered()); + consumerSession.commit(); + assertTrue(recMsg.equals(sentMsg)); + assertTrue(recMsg.getJMSRedelivered()); + connection.close(); + + assertFalse("no exception", gotException.get()); + } + /** * Check a session is rollbacked on a Session close(); *