From f29c8011d0acae4aa7f7cc7a796dd906bd38d059 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Fri, 4 Mar 2011 22:39:11 +0000 Subject: [PATCH] Fix for: https://issues.apache.org/jira/browse/AMQ-3045 Adds an optional maximumRedeliveryDelay that defaults to -1 to indicate no max so behavior is unchanged from previous versions. When the backoff multiplier is enabled and the maximumReliveryDelay is set > -1 then the delay returned by getNextReliveryDealy is capped by the max of either maximumReliveryDelay or redeliveryDelay preventing delays smaller than the configured redelivery delay. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1078189 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/RedeliveryPolicy.java | 17 +- .../apache/activemq/RedeliveryPolicyTest.java | 196 +++++++++++------- 2 files changed, 138 insertions(+), 75 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java index 059188e2ef..e5e11355ec 100644 --- a/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java @@ -22,9 +22,9 @@ import java.util.Random; /** * Configuration options used to control how messages are re-delivered when they * are rolled back. - * + * * @org.apache.xbean.XBean element="redeliveryPolicy" - * + * */ public class RedeliveryPolicy implements Cloneable, Serializable { @@ -34,6 +34,7 @@ public class RedeliveryPolicy implements Cloneable, Serializable { // +/-15% for a 30% spread -cgs private double collisionAvoidanceFactor = 0.15d; private int maximumRedeliveries = 6; + private long maximumRedeliveryDelay = -1; private long initialRedeliveryDelay = 1000L; private boolean useCollisionAvoidance; private boolean useExponentialBackOff; @@ -75,6 +76,14 @@ public class RedeliveryPolicy implements Cloneable, Serializable { this.initialRedeliveryDelay = initialRedeliveryDelay; } + public long getMaximumRedeliveryDelay() { + return maximumRedeliveryDelay; + } + + public void setMaximumRedeliveryDelay(long maximumRedeliveryDelay) { + this.maximumRedeliveryDelay = maximumRedeliveryDelay; + } + public int getMaximumRedeliveries() { return maximumRedeliveries; } @@ -90,6 +99,10 @@ public class RedeliveryPolicy implements Cloneable, Serializable { nextDelay = redeliveryDelay; } else if (useExponentialBackOff && backOffMultiplier > 1) { nextDelay = (long) (previousDelay * backOffMultiplier); + if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) { + // in case the user made max redelivery delay less than redelivery delay for some reason. + nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay); + } } else { nextDelay = previousDelay; } diff --git a/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java index 0911126aca..1e378d4697 100644 --- a/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -27,8 +27,8 @@ import org.apache.activemq.command.ActiveMQQueue; /** * Test cases used to test the JMS message exclusive consumers. - * - * + * + * */ public class RedeliveryPolicyTest extends JmsTestSupport { @@ -48,42 +48,42 @@ public class RedeliveryPolicyTest extends JmsTestSupport { // Receive a message with the JMS API RedeliveryPolicy policy = connection.getRedeliveryPolicy(); policy.setInitialRedeliveryDelay(0); - policy.setRedeliveryDelay(500); + policy.setRedeliveryDelay(500); policy.setBackOffMultiplier((short) 2); policy.setUseExponentialBackOff(true); - + connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); ActiveMQQueue destination = new ActiveMQQueue(getName()); MessageProducer producer = session.createProducer(destination); - + MessageConsumer consumer = session.createConsumer(destination); // Send the messages producer.send(session.createTextMessage("1st")); producer.send(session.createTextMessage("2nd")); session.commit(); - + TextMessage m; m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); + assertEquals("1st", m.getText()); session.rollback(); // No delay on first rollback.. m = (TextMessage)consumer.receive(100); assertNotNull(m); session.rollback(); - + // Show subsequent re-delivery delay is incrementing. m = (TextMessage)consumer.receive(100); assertNull(m); - + m = (TextMessage)consumer.receive(700); assertNotNull(m); - assertEquals("1st", m.getText()); + assertEquals("1st", m.getText()); session.rollback(); - + // Show re-delivery delay is incrementing exponentially m = (TextMessage)consumer.receive(100); assertNull(m); @@ -91,8 +91,8 @@ public class RedeliveryPolicyTest extends JmsTestSupport { assertNull(m); m = (TextMessage)consumer.receive(700); assertNotNull(m); - assertEquals("1st", m.getText()); - + assertEquals("1st", m.getText()); + } @@ -110,41 +110,41 @@ public class RedeliveryPolicyTest extends JmsTestSupport { Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); ActiveMQQueue destination = new ActiveMQQueue(getName()); MessageProducer producer = session.createProducer(destination); - + MessageConsumer consumer = session.createConsumer(destination); // Send the messages producer.send(session.createTextMessage("1st")); producer.send(session.createTextMessage("2nd")); session.commit(); - + TextMessage m; m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); + assertEquals("1st", m.getText()); session.rollback(); // No delay on first rollback.. m = (TextMessage)consumer.receive(100); assertNotNull(m); session.rollback(); - + // Show subsequent re-delivery delay is incrementing. m = (TextMessage)consumer.receive(100); assertNull(m); m = (TextMessage)consumer.receive(700); assertNotNull(m); - assertEquals("1st", m.getText()); + assertEquals("1st", m.getText()); session.rollback(); - + // The message gets redelivered after 500 ms every time since // we are not using exponential backoff. m = (TextMessage)consumer.receive(100); assertNull(m); m = (TextMessage)consumer.receive(700); assertNotNull(m); - assertEquals("1st", m.getText()); - + assertEquals("1st", m.getText()); + } /** @@ -157,12 +157,12 @@ public class RedeliveryPolicyTest extends JmsTestSupport { policy.setInitialRedeliveryDelay(100); policy.setUseExponentialBackOff(false); policy.setMaximumRedeliveries(2); - + connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); ActiveMQQueue destination = new ActiveMQQueue("TEST"); MessageProducer producer = session.createProducer(destination); - + MessageConsumer consumer = session.createConsumer(destination); MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); @@ -170,38 +170,38 @@ public class RedeliveryPolicyTest extends JmsTestSupport { producer.send(session.createTextMessage("1st")); producer.send(session.createTextMessage("2nd")); session.commit(); - + TextMessage m; m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); + assertEquals("1st", m.getText()); session.rollback(); m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); + assertEquals("1st", m.getText()); session.rollback(); m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); + assertEquals("1st", m.getText()); session.rollback(); - - // The last rollback should cause the 1st message to get sent to the DLQ + + // The last rollback should cause the 1st message to get sent to the DLQ m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("2nd", m.getText()); + assertEquals("2nd", m.getText()); session.commit(); - + // We should be able to get the message off the DLQ now. m = (TextMessage)dlqConsumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); + assertEquals("1st", m.getText()); session.commit(); - + } - - + + /** * @throws Exception */ @@ -213,60 +213,110 @@ public class RedeliveryPolicyTest extends JmsTestSupport { policy.setUseExponentialBackOff(false); // let's set the maximum redeliveries to no maximum (ie. infinite) policy.setMaximumRedeliveries(-1); - - + + connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); ActiveMQQueue destination = new ActiveMQQueue("TEST"); MessageProducer producer = session.createProducer(destination); - + MessageConsumer consumer = session.createConsumer(destination); - + // Send the messages producer.send(session.createTextMessage("1st")); producer.send(session.createTextMessage("2nd")); session.commit(); - + TextMessage m; - + m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); + assertEquals("1st", m.getText()); session.rollback(); - + //we should be able to get the 1st message redelivered until a session.commit is called m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); - session.rollback(); - + assertEquals("1st", m.getText()); + session.rollback(); + m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); - session.rollback(); - + assertEquals("1st", m.getText()); + session.rollback(); + m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); - session.rollback(); - + assertEquals("1st", m.getText()); + session.rollback(); + m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); - session.rollback(); - + assertEquals("1st", m.getText()); + session.rollback(); + m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); - session.commit(); - + assertEquals("1st", m.getText()); + session.commit(); + m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("2nd", m.getText()); - session.commit(); - + assertEquals("2nd", m.getText()); + session.commit(); + } - + + /** + * @throws Exception + */ + public void testMaximumRedeliveryDelay() throws Exception { + + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(10); + policy.setUseExponentialBackOff(true); + policy.setMaximumRedeliveries(-1); + policy.setRedeliveryDelay(50); + policy.setMaximumRedeliveryDelay(1000); + policy.setBackOffMultiplier((short) 2); + policy.setUseExponentialBackOff(true); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + session.commit(); + + TextMessage m; + + for(int i = 0; i < 10; ++i) { + // we should be able to get the 1st message redelivered until a session.commit is called + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + } + + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.commit(); + + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.commit(); + + assertTrue(policy.getNextRedeliveryDelay(Long.MAX_VALUE) == 1000 ); + } + /** * @throws Exception */ @@ -278,33 +328,33 @@ public class RedeliveryPolicyTest extends JmsTestSupport { policy.setUseExponentialBackOff(false); //let's set the maximum redeliveries to 0 policy.setMaximumRedeliveries(0); - + connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); ActiveMQQueue destination = new ActiveMQQueue("TEST"); MessageProducer producer = session.createProducer(destination); - + MessageConsumer consumer = session.createConsumer(destination); - + // Send the messages producer.send(session.createTextMessage("1st")); producer.send(session.createTextMessage("2nd")); session.commit(); - + TextMessage m; m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("1st", m.getText()); + assertEquals("1st", m.getText()); session.rollback(); - + //the 1st message should not be redelivered since maximumRedeliveries is set to 0 m = (TextMessage)consumer.receive(1000); assertNotNull(m); - assertEquals("2nd", m.getText()); - session.commit(); - - - + assertEquals("2nd", m.getText()); + session.commit(); + + + }