resolve: https://issues.apache.org/activemq/browse/AMQ-3056 - do not throw on redelivery ack for a topic

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1039392 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-11-26 14:45:17 +00:00
parent dbaf2b1c8c
commit 81f3935cf3
2 changed files with 54 additions and 0 deletions

View File

@ -282,6 +282,9 @@ public class TopicSubscription extends AbstractSubscription {
dequeueCounter.addAndGet(ack.getMessageCount());
dispatchMatched();
return;
} else if (ack.isRedeliveredAck()) {
// nothing to do atm
return;
}
throw new JMSException("Invalid acknowledgment: " + ack);
}

View File

@ -16,9 +16,12 @@
*/
package org.apache.activemq.usecases;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -28,12 +31,15 @@ import javax.jms.Topic;
import org.apache.activemq.test.TestSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.1.1.1 $
*/
public class TopicRedeliverTest extends TestSupport {
private static final Log LOG = LogFactory.getLog(TopicRedeliverTest.class);
private static final int RECEIVE_TIMEOUT = 10000;
protected int deliveryMode = DeliveryMode.PERSISTENT;
@ -141,6 +147,51 @@ public class TopicRedeliverTest extends TestSupport {
connection.close();
}
public void testNoExceptionOnRedeliveryAckWithSimpleTopicConsumer() throws Exception {
Destination destination = createDestination(getClass().getName());
Connection connection = createConnection();
final AtomicBoolean gotException = new AtomicBoolean();
connection.setExceptionListener(new ExceptionListener() {
public void onException(JMSException exception) {
LOG.error("unexpected ex:" + exception);
gotException.set(true);
}
});
connection.setClientID(idGen.generateId());
connection.start();
Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = null;
if (topic) {
consumer = consumerSession.createConsumer((Topic)destination);
} else {
consumer = consumerSession.createConsumer(destination);
}
Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
producer.setDeliveryMode(deliveryMode);
TextMessage sentMsg = producerSession.createTextMessage();
sentMsg.setText("msg1");
producer.send(sentMsg);
producerSession.commit();
Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
assertFalse(recMsg.getJMSRedelivered());
recMsg = consumer.receive(RECEIVE_TIMEOUT);
consumerSession.rollback();
recMsg = consumer.receive(RECEIVE_TIMEOUT);
assertTrue(recMsg.getJMSRedelivered());
consumerSession.rollback();
recMsg = consumer.receive(RECEIVE_TIMEOUT);
assertTrue(recMsg.getJMSRedelivered());
consumerSession.commit();
assertTrue(recMsg.equals(sentMsg));
assertTrue(recMsg.getJMSRedelivered());
connection.close();
assertFalse("no exception", gotException.get());
}
/**
* Check a session is rollbacked on a Session close();
*