Adds an optional maximumRedeliveryDelay that defaults to -1 to indicate no max so behavior is unchanged from previous versions.  When the backoff multiplier is enabled and the maximumReliveryDelay is set > -1 then the delay returned by getNextReliveryDealy is capped by the max of either maximumReliveryDelay or redeliveryDelay preventing delays smaller than the configured redelivery delay.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1078189 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-03-04 22:39:11 +00:00
parent 69f56058f2
commit f29c8011d0
2 changed files with 138 additions and 75 deletions

View File

@ -22,9 +22,9 @@ import java.util.Random;
/** /**
* Configuration options used to control how messages are re-delivered when they * Configuration options used to control how messages are re-delivered when they
* are rolled back. * are rolled back.
* *
* @org.apache.xbean.XBean element="redeliveryPolicy" * @org.apache.xbean.XBean element="redeliveryPolicy"
* *
*/ */
public class RedeliveryPolicy implements Cloneable, Serializable { public class RedeliveryPolicy implements Cloneable, Serializable {
@ -34,6 +34,7 @@ public class RedeliveryPolicy implements Cloneable, Serializable {
// +/-15% for a 30% spread -cgs // +/-15% for a 30% spread -cgs
private double collisionAvoidanceFactor = 0.15d; private double collisionAvoidanceFactor = 0.15d;
private int maximumRedeliveries = 6; private int maximumRedeliveries = 6;
private long maximumRedeliveryDelay = -1;
private long initialRedeliveryDelay = 1000L; private long initialRedeliveryDelay = 1000L;
private boolean useCollisionAvoidance; private boolean useCollisionAvoidance;
private boolean useExponentialBackOff; private boolean useExponentialBackOff;
@ -75,6 +76,14 @@ public class RedeliveryPolicy implements Cloneable, Serializable {
this.initialRedeliveryDelay = initialRedeliveryDelay; this.initialRedeliveryDelay = initialRedeliveryDelay;
} }
public long getMaximumRedeliveryDelay() {
return maximumRedeliveryDelay;
}
public void setMaximumRedeliveryDelay(long maximumRedeliveryDelay) {
this.maximumRedeliveryDelay = maximumRedeliveryDelay;
}
public int getMaximumRedeliveries() { public int getMaximumRedeliveries() {
return maximumRedeliveries; return maximumRedeliveries;
} }
@ -90,6 +99,10 @@ public class RedeliveryPolicy implements Cloneable, Serializable {
nextDelay = redeliveryDelay; nextDelay = redeliveryDelay;
} else if (useExponentialBackOff && backOffMultiplier > 1) { } else if (useExponentialBackOff && backOffMultiplier > 1) {
nextDelay = (long) (previousDelay * backOffMultiplier); nextDelay = (long) (previousDelay * backOffMultiplier);
if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
// in case the user made max redelivery delay less than redelivery delay for some reason.
nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
}
} else { } else {
nextDelay = previousDelay; nextDelay = previousDelay;
} }

View File

@ -27,8 +27,8 @@ import org.apache.activemq.command.ActiveMQQueue;
/** /**
* Test cases used to test the JMS message exclusive consumers. * Test cases used to test the JMS message exclusive consumers.
* *
* *
*/ */
public class RedeliveryPolicyTest extends JmsTestSupport { public class RedeliveryPolicyTest extends JmsTestSupport {
@ -48,42 +48,42 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
// Receive a message with the JMS API // Receive a message with the JMS API
RedeliveryPolicy policy = connection.getRedeliveryPolicy(); RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(0); policy.setInitialRedeliveryDelay(0);
policy.setRedeliveryDelay(500); policy.setRedeliveryDelay(500);
policy.setBackOffMultiplier((short) 2); policy.setBackOffMultiplier((short) 2);
policy.setUseExponentialBackOff(true); policy.setUseExponentialBackOff(true);
connection.start(); connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue(getName()); ActiveMQQueue destination = new ActiveMQQueue(getName());
MessageProducer producer = session.createProducer(destination); MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination); MessageConsumer consumer = session.createConsumer(destination);
// Send the messages // Send the messages
producer.send(session.createTextMessage("1st")); producer.send(session.createTextMessage("1st"));
producer.send(session.createTextMessage("2nd")); producer.send(session.createTextMessage("2nd"));
session.commit(); session.commit();
TextMessage m; TextMessage m;
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
// No delay on first rollback.. // No delay on first rollback..
m = (TextMessage)consumer.receive(100); m = (TextMessage)consumer.receive(100);
assertNotNull(m); assertNotNull(m);
session.rollback(); session.rollback();
// Show subsequent re-delivery delay is incrementing. // Show subsequent re-delivery delay is incrementing.
m = (TextMessage)consumer.receive(100); m = (TextMessage)consumer.receive(100);
assertNull(m); assertNull(m);
m = (TextMessage)consumer.receive(700); m = (TextMessage)consumer.receive(700);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
// Show re-delivery delay is incrementing exponentially // Show re-delivery delay is incrementing exponentially
m = (TextMessage)consumer.receive(100); m = (TextMessage)consumer.receive(100);
assertNull(m); assertNull(m);
@ -91,8 +91,8 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertNull(m); assertNull(m);
m = (TextMessage)consumer.receive(700); m = (TextMessage)consumer.receive(700);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
} }
@ -110,41 +110,41 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue(getName()); ActiveMQQueue destination = new ActiveMQQueue(getName());
MessageProducer producer = session.createProducer(destination); MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination); MessageConsumer consumer = session.createConsumer(destination);
// Send the messages // Send the messages
producer.send(session.createTextMessage("1st")); producer.send(session.createTextMessage("1st"));
producer.send(session.createTextMessage("2nd")); producer.send(session.createTextMessage("2nd"));
session.commit(); session.commit();
TextMessage m; TextMessage m;
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
// No delay on first rollback.. // No delay on first rollback..
m = (TextMessage)consumer.receive(100); m = (TextMessage)consumer.receive(100);
assertNotNull(m); assertNotNull(m);
session.rollback(); session.rollback();
// Show subsequent re-delivery delay is incrementing. // Show subsequent re-delivery delay is incrementing.
m = (TextMessage)consumer.receive(100); m = (TextMessage)consumer.receive(100);
assertNull(m); assertNull(m);
m = (TextMessage)consumer.receive(700); m = (TextMessage)consumer.receive(700);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
// The message gets redelivered after 500 ms every time since // The message gets redelivered after 500 ms every time since
// we are not using exponential backoff. // we are not using exponential backoff.
m = (TextMessage)consumer.receive(100); m = (TextMessage)consumer.receive(100);
assertNull(m); assertNull(m);
m = (TextMessage)consumer.receive(700); m = (TextMessage)consumer.receive(700);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
} }
/** /**
@ -157,12 +157,12 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
policy.setInitialRedeliveryDelay(100); policy.setInitialRedeliveryDelay(100);
policy.setUseExponentialBackOff(false); policy.setUseExponentialBackOff(false);
policy.setMaximumRedeliveries(2); policy.setMaximumRedeliveries(2);
connection.start(); connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST"); ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = session.createProducer(destination); MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination); MessageConsumer consumer = session.createConsumer(destination);
MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
@ -170,38 +170,38 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
producer.send(session.createTextMessage("1st")); producer.send(session.createTextMessage("1st"));
producer.send(session.createTextMessage("2nd")); producer.send(session.createTextMessage("2nd"));
session.commit(); session.commit();
TextMessage m; TextMessage m;
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
// The last rollback should cause the 1st message to get sent to the DLQ // The last rollback should cause the 1st message to get sent to the DLQ
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("2nd", m.getText()); assertEquals("2nd", m.getText());
session.commit(); session.commit();
// We should be able to get the message off the DLQ now. // We should be able to get the message off the DLQ now.
m = (TextMessage)dlqConsumer.receive(1000); m = (TextMessage)dlqConsumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.commit(); session.commit();
} }
/** /**
* @throws Exception * @throws Exception
*/ */
@ -213,60 +213,110 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
policy.setUseExponentialBackOff(false); policy.setUseExponentialBackOff(false);
// let's set the maximum redeliveries to no maximum (ie. infinite) // let's set the maximum redeliveries to no maximum (ie. infinite)
policy.setMaximumRedeliveries(-1); policy.setMaximumRedeliveries(-1);
connection.start(); connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST"); ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = session.createProducer(destination); MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination); MessageConsumer consumer = session.createConsumer(destination);
// Send the messages // Send the messages
producer.send(session.createTextMessage("1st")); producer.send(session.createTextMessage("1st"));
producer.send(session.createTextMessage("2nd")); producer.send(session.createTextMessage("2nd"));
session.commit(); session.commit();
TextMessage m; TextMessage m;
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
//we should be able to get the 1st message redelivered until a session.commit is called //we should be able to get the 1st message redelivered until a session.commit is called
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.commit(); session.commit();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("2nd", m.getText()); assertEquals("2nd", m.getText());
session.commit(); session.commit();
} }
/**
* @throws Exception
*/
public void testMaximumRedeliveryDelay() throws Exception {
// Receive a message with the JMS API
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(10);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(-1);
policy.setRedeliveryDelay(50);
policy.setMaximumRedeliveryDelay(1000);
policy.setBackOffMultiplier((short) 2);
policy.setUseExponentialBackOff(true);
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
// Send the messages
producer.send(session.createTextMessage("1st"));
producer.send(session.createTextMessage("2nd"));
session.commit();
TextMessage m;
for(int i = 0; i < 10; ++i) {
// we should be able to get the 1st message redelivered until a session.commit is called
m = (TextMessage)consumer.receive(2000);
assertNotNull(m);
assertEquals("1st", m.getText());
session.rollback();
}
m = (TextMessage)consumer.receive(2000);
assertNotNull(m);
assertEquals("1st", m.getText());
session.commit();
m = (TextMessage)consumer.receive(2000);
assertNotNull(m);
assertEquals("2nd", m.getText());
session.commit();
assertTrue(policy.getNextRedeliveryDelay(Long.MAX_VALUE) == 1000 );
}
/** /**
* @throws Exception * @throws Exception
*/ */
@ -278,33 +328,33 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
policy.setUseExponentialBackOff(false); policy.setUseExponentialBackOff(false);
//let's set the maximum redeliveries to 0 //let's set the maximum redeliveries to 0
policy.setMaximumRedeliveries(0); policy.setMaximumRedeliveries(0);
connection.start(); connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST"); ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = session.createProducer(destination); MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination); MessageConsumer consumer = session.createConsumer(destination);
// Send the messages // Send the messages
producer.send(session.createTextMessage("1st")); producer.send(session.createTextMessage("1st"));
producer.send(session.createTextMessage("2nd")); producer.send(session.createTextMessage("2nd"));
session.commit(); session.commit();
TextMessage m; TextMessage m;
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
//the 1st message should not be redelivered since maximumRedeliveries is set to 0 //the 1st message should not be redelivered since maximumRedeliveries is set to 0
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(1000);
assertNotNull(m); assertNotNull(m);
assertEquals("2nd", m.getText()); assertEquals("2nd", m.getText());
session.commit(); session.commit();
} }