mirror of https://github.com/apache/activemq.git
applied patch for http://issues.apache.org/activemq/browse/AMQ-967
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/branches/activemq-4.1@476525 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e8e8b43b76
commit
45b71de844
|
@ -782,7 +782,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
|||
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
|
||||
|
||||
rollbackCounter++;
|
||||
if(rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
|
||||
if(redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
|
||||
&& rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
|
||||
// We need to NACK the messages so that they get sent to the
|
||||
// DLQ.
|
||||
// Acknowledge the last message.
|
||||
|
|
|
@ -712,7 +712,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
|||
|
||||
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
|
||||
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
|
||||
if (redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
|
||||
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
|
||||
&& redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
|
||||
|
||||
// We need to NACK the messages so that they get sent to the
|
||||
// DLQ.
|
||||
|
|
|
@ -30,6 +30,8 @@ import java.util.Random;
|
|||
*/
|
||||
public class RedeliveryPolicy implements Cloneable, Serializable {
|
||||
|
||||
public static final int NO_MAXIMUM_REDELIVERIES = -1;
|
||||
|
||||
// +/-15% for a 30% spread -cgs
|
||||
protected double collisionAvoidanceFactor = 0.15d;
|
||||
protected int maximumRedeliveries = 6;
|
||||
|
|
|
@ -199,4 +199,109 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
|||
session.commit();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testInfiniteMaximumNumberOfRedeliveries() throws Exception {
|
||||
|
||||
// Receive a message with the JMS API
|
||||
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
||||
policy.setInitialRedeliveryDelay(100);
|
||||
policy.setUseExponentialBackOff(false);
|
||||
// let's set the maximum redeliveries to no maximum (ie. infinite)
|
||||
policy.setMaximumRedeliveries(-1);
|
||||
|
||||
|
||||
connection.start();
|
||||
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||
ActiveMQQueue destination = new ActiveMQQueue("TEST");
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
// Send the messages
|
||||
producer.send(session.createTextMessage("1st"));
|
||||
producer.send(session.createTextMessage("2nd"));
|
||||
session.commit();
|
||||
|
||||
TextMessage m;
|
||||
|
||||
m = (TextMessage)consumer.receive(1000);
|
||||
assertNotNull(m);
|
||||
assertEquals("1st", m.getText());
|
||||
session.rollback();
|
||||
|
||||
//we should be able to get the 1st message redelivered until a session.commit is called
|
||||
m = (TextMessage)consumer.receive(1000);
|
||||
assertNotNull(m);
|
||||
assertEquals("1st", m.getText());
|
||||
session.rollback();
|
||||
|
||||
m = (TextMessage)consumer.receive(1000);
|
||||
assertNotNull(m);
|
||||
assertEquals("1st", m.getText());
|
||||
session.rollback();
|
||||
|
||||
m = (TextMessage)consumer.receive(1000);
|
||||
assertNotNull(m);
|
||||
assertEquals("1st", m.getText());
|
||||
session.rollback();
|
||||
|
||||
m = (TextMessage)consumer.receive(1000);
|
||||
assertNotNull(m);
|
||||
assertEquals("1st", m.getText());
|
||||
session.rollback();
|
||||
|
||||
m = (TextMessage)consumer.receive(1000);
|
||||
assertNotNull(m);
|
||||
assertEquals("1st", m.getText());
|
||||
session.commit();
|
||||
|
||||
m = (TextMessage)consumer.receive(1000);
|
||||
assertNotNull(m);
|
||||
assertEquals("2nd", m.getText());
|
||||
session.commit();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testZeroMaximumNumberOfRedeliveries() throws Exception {
|
||||
|
||||
// Receive a message with the JMS API
|
||||
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
||||
policy.setInitialRedeliveryDelay(100);
|
||||
policy.setUseExponentialBackOff(false);
|
||||
//let's set the maximum redeliveries to 0
|
||||
policy.setMaximumRedeliveries(0);
|
||||
|
||||
connection.start();
|
||||
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||
ActiveMQQueue destination = new ActiveMQQueue("TEST");
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
// Send the messages
|
||||
producer.send(session.createTextMessage("1st"));
|
||||
producer.send(session.createTextMessage("2nd"));
|
||||
session.commit();
|
||||
|
||||
TextMessage m;
|
||||
m = (TextMessage)consumer.receive(1000);
|
||||
assertNotNull(m);
|
||||
assertEquals("1st", m.getText());
|
||||
session.rollback();
|
||||
|
||||
//the 1st message should not be redelivered since maximumRedeliveries is set to 0
|
||||
m = (TextMessage)consumer.receive(1000);
|
||||
assertNotNull(m);
|
||||
assertEquals("2nd", m.getText());
|
||||
session.commit();
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue