mirror of https://github.com/apache/activemq.git
applied patch for http://issues.apache.org/activemq/browse/AMQ-967
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@476523 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
97d0a61057
commit
979c5d55ee
|
@ -802,7 +802,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
|
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
|
||||||
|
|
||||||
rollbackCounter++;
|
rollbackCounter++;
|
||||||
if(rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
|
if(redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
|
||||||
|
&& rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
|
||||||
// We need to NACK the messages so that they get sent to the
|
// We need to NACK the messages so that they get sent to the
|
||||||
// DLQ.
|
// DLQ.
|
||||||
// Acknowledge the last message.
|
// Acknowledge the last message.
|
||||||
|
|
|
@ -714,7 +714,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
|
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
|
||||||
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
|
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
|
||||||
if (redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
|
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
|
||||||
|
&& redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries()) {
|
||||||
|
|
||||||
// We need to NACK the messages so that they get sent to the
|
// We need to NACK the messages so that they get sent to the
|
||||||
// DLQ.
|
// DLQ.
|
||||||
|
|
|
@ -30,6 +30,8 @@ import java.util.Random;
|
||||||
*/
|
*/
|
||||||
public class RedeliveryPolicy implements Cloneable, Serializable {
|
public class RedeliveryPolicy implements Cloneable, Serializable {
|
||||||
|
|
||||||
|
public static final int NO_MAXIMUM_REDELIVERIES = -1;
|
||||||
|
|
||||||
// +/-15% for a 30% spread -cgs
|
// +/-15% for a 30% spread -cgs
|
||||||
protected double collisionAvoidanceFactor = 0.15d;
|
protected double collisionAvoidanceFactor = 0.15d;
|
||||||
protected int maximumRedeliveries = 6;
|
protected int maximumRedeliveries = 6;
|
||||||
|
|
|
@ -198,5 +198,111 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
||||||
assertEquals("1st", m.getText());
|
assertEquals("1st", m.getText());
|
||||||
session.commit();
|
session.commit();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void testInfiniteMaximumNumberOfRedeliveries() throws Exception {
|
||||||
|
|
||||||
|
// Receive a message with the JMS API
|
||||||
|
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
||||||
|
policy.setInitialRedeliveryDelay(100);
|
||||||
|
policy.setUseExponentialBackOff(false);
|
||||||
|
// let's set the maximum redeliveries to no maximum (ie. infinite)
|
||||||
|
policy.setMaximumRedeliveries(-1);
|
||||||
|
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
ActiveMQQueue destination = new ActiveMQQueue("TEST");
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
// Send the messages
|
||||||
|
producer.send(session.createTextMessage("1st"));
|
||||||
|
producer.send(session.createTextMessage("2nd"));
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
TextMessage m;
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
//we should be able to get the 1st message redelivered until a session.commit is called
|
||||||
|
m = (TextMessage)consumer.receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("2nd", m.getText());
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void testZeroMaximumNumberOfRedeliveries() throws Exception {
|
||||||
|
|
||||||
|
// Receive a message with the JMS API
|
||||||
|
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
||||||
|
policy.setInitialRedeliveryDelay(100);
|
||||||
|
policy.setUseExponentialBackOff(false);
|
||||||
|
//let's set the maximum redeliveries to 0
|
||||||
|
policy.setMaximumRedeliveries(0);
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
ActiveMQQueue destination = new ActiveMQQueue("TEST");
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
// Send the messages
|
||||||
|
producer.send(session.createTextMessage("1st"));
|
||||||
|
producer.send(session.createTextMessage("2nd"));
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
TextMessage m;
|
||||||
|
m = (TextMessage)consumer.receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
//the 1st message should not be redelivered since maximumRedeliveries is set to 0
|
||||||
|
m = (TextMessage)consumer.receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("2nd", m.getText());
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue