This commit is contained in:
gtully 2014-03-06 22:39:16 +00:00
parent f555d90e92
commit 33b88d34a9
3 changed files with 32 additions and 14 deletions

View File

@ -98,18 +98,14 @@ public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable,
} }
public long getNextRedeliveryDelay(long previousDelay) { public long getNextRedeliveryDelay(long previousDelay) {
long nextDelay; long nextDelay = redeliveryDelay;
if (previousDelay == 0) { if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
nextDelay = redeliveryDelay;
} else if (useExponentialBackOff && backOffMultiplier > 1) {
nextDelay = (long) (previousDelay * backOffMultiplier); nextDelay = (long) (previousDelay * backOffMultiplier);
if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) { if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
// in case the user made max redelivery delay less than redelivery delay for some reason. // in case the user made max redelivery delay less than redelivery delay for some reason.
nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay); nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
} }
} else {
nextDelay = previousDelay;
} }
if (useCollisionAvoidance) { if (useCollisionAvoidance) {

View File

@ -43,6 +43,27 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
junit.textui.TestRunner.run(suite()); junit.textui.TestRunner.run(suite());
} }
public void testGetNext() throws Exception {
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(0);
policy.setRedeliveryDelay(500);
policy.setBackOffMultiplier((short) 2);
policy.setUseExponentialBackOff(true);
long delay = policy.getNextRedeliveryDelay(0);
assertEquals(500, delay);
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(500*2, delay);
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(500*4, delay);
policy.setUseExponentialBackOff(false);
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(500, delay);
}
/** /**
* @throws Exception * @throws Exception
*/ */
@ -185,7 +206,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(2000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
@ -245,27 +266,27 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(2000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(2000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(2000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.rollback(); session.rollback();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(2000);
assertNotNull(m); assertNotNull(m);
assertEquals("1st", m.getText()); assertEquals("1st", m.getText());
session.commit(); session.commit();
m = (TextMessage)consumer.receive(1000); m = (TextMessage)consumer.receive(2000);
assertNotNull(m); assertNotNull(m);
assertEquals("2nd", m.getText()); assertEquals("2nd", m.getText());
session.commit(); session.commit();

View File

@ -40,6 +40,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
final ActiveMQQueue destination = new ActiveMQQueue("Redelivery"); final ActiveMQQueue destination = new ActiveMQQueue("Redelivery");
final String data = "hi"; final String data = "hi";
final long redeliveryDelayMillis = 2000; final long redeliveryDelayMillis = 2000;
final long initialRedeliveryDelayMillis = 4000;
int maxBrokerRedeliveries = 2; int maxBrokerRedeliveries = 2;
public void testScheduledRedelivery() throws Exception { public void testScheduledRedelivery() throws Exception {
@ -79,7 +80,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
LOG.info("got: " + brokerRedeliveryMessage); LOG.info("got: " + brokerRedeliveryMessage);
assertNotNull("got message via broker redelivery after delay", brokerRedeliveryMessage); assertNotNull("got message via broker redelivery after delay", brokerRedeliveryMessage);
assertEquals("message matches", message.getStringProperty("data"), brokerRedeliveryMessage.getStringProperty("data")); assertEquals("message matches", message.getStringProperty("data"), brokerRedeliveryMessage.getStringProperty("data"));
assertEquals("has expiryDelay specified", redeliveryDelayMillis, brokerRedeliveryMessage.getLongProperty(RedeliveryPlugin.REDELIVERY_DELAY)); assertEquals("has expiryDelay specified", i == 0 ? initialRedeliveryDelayMillis : redeliveryDelayMillis, brokerRedeliveryMessage.getLongProperty(RedeliveryPlugin.REDELIVERY_DELAY));
consumerSession.rollback(); consumerSession.rollback();
} }
@ -149,7 +150,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
RedeliveryPolicy brokerRedeliveryPolicy = new RedeliveryPolicy(); RedeliveryPolicy brokerRedeliveryPolicy = new RedeliveryPolicy();
brokerRedeliveryPolicy.setRedeliveryDelay(redeliveryDelayMillis); brokerRedeliveryPolicy.setRedeliveryDelay(redeliveryDelayMillis);
brokerRedeliveryPolicy.setInitialRedeliveryDelay(redeliveryDelayMillis); brokerRedeliveryPolicy.setInitialRedeliveryDelay(initialRedeliveryDelayMillis);
brokerRedeliveryPolicy.setMaximumRedeliveries(maxBrokerRedeliveries); brokerRedeliveryPolicy.setMaximumRedeliveries(maxBrokerRedeliveries);
RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap(); RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();