From 4c4f86a0d8523eb9fca630b181ce2b90e28940a6 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Mon, 19 Jun 2006 11:55:44 +0000 Subject: [PATCH] applied patch from Christopher G. Stach II for AMQ-747 to allow redelivery backoff to add a collision avoidence capability. Many thanks! git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@415300 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 7 +-- .../org/apache/activemq/ActiveMQSession.java | 7 +-- .../org/apache/activemq/RedeliveryPolicy.java | 55 +++++++++++++++++++ 3 files changed, 57 insertions(+), 12 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index d35319a03b..724340251a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -754,12 +754,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // stop the delivery of messages. unconsumedMessages.stop(); // Start up the delivery again a little later. - if(redeliveryDelay==0){ - redeliveryDelay=redeliveryPolicy.getInitialRedeliveryDelay(); - }else{ - if(redeliveryPolicy.isUseExponentialBackOff()) - redeliveryDelay*=redeliveryPolicy.getBackOffMultiplier(); - } + redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); Scheduler.executeAfterDelay(new Runnable(){ public void run(){ try{ diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index d3f3b6d82d..2487b77f1a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -724,12 +724,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta // Figure out how long we should wait to resend this message. long redeliveryDelay=0; for( int i=0; i < redeliveryCounter; i++) { - if (redeliveryDelay == 0) { - redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); - } else { - if (redeliveryPolicy.isUseExponentialBackOff()) - redeliveryDelay *= redeliveryPolicy.getBackOffMultiplier(); - } + redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); } Scheduler.executeAfterDelay(new Runnable() { diff --git a/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java index df0a8d66cc..f6314f30d6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java @@ -17,6 +17,7 @@ package org.apache.activemq; import java.io.Serializable; +import java.util.Random; /** * Configuration options used to control how messages are re-delivered when they @@ -26,8 +27,12 @@ import java.io.Serializable; */ public class RedeliveryPolicy implements Cloneable, Serializable { + // +/-15% for a 30% spread -cgs + protected double collisionAvoidanceFactor = 0.15d; protected int maximumRedeliveries = 5; protected long initialRedeliveryDelay = 1000L; + protected static Random randomNumberGenerator; + protected boolean useCollisionAvoidance = false; protected boolean useExponentialBackOff = false; protected short backOffMultiplier = 5; @@ -51,6 +56,14 @@ public class RedeliveryPolicy implements Cloneable, Serializable { this.backOffMultiplier = backOffMultiplier; } + public short getCollisionAvoidancePercent() { + return (short) Math.round(collisionAvoidanceFactor * 100); + } + + public void setCollisionAvoidancePercent(short collisionAvoidancePercent) { + this.collisionAvoidanceFactor = collisionAvoidancePercent * 0.01d; + } + public long getInitialRedeliveryDelay() { return initialRedeliveryDelay; } @@ -67,6 +80,41 @@ public class RedeliveryPolicy implements Cloneable, Serializable { this.maximumRedeliveries = maximumRedeliveries; } + public long getRedeliveryDelay(long previousDelay) { + long redeliveryDelay; + + if (previousDelay == 0) { + redeliveryDelay = initialRedeliveryDelay; + } else if (useExponentialBackOff && backOffMultiplier > 1) { + redeliveryDelay = previousDelay * backOffMultiplier; + } else { + redeliveryDelay = previousDelay; + } + + if (useCollisionAvoidance) { + if (randomNumberGenerator == null) { + initRandomNumberGenerator(); + } + + /* + * First random determines +/-, second random determines how far to + * go in that direction. -cgs + */ + double variance = (randomNumberGenerator.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * randomNumberGenerator.nextDouble(); + redeliveryDelay += redeliveryDelay * variance; + } + + return redeliveryDelay; + } + + public boolean isUseCollisionAvoidance() { + return useCollisionAvoidance; + } + + public void setUseCollisionAvoidance(boolean useCollisionAvoidance) { + this.useCollisionAvoidance = useCollisionAvoidance; + } + public boolean isUseExponentialBackOff() { return useExponentialBackOff; } @@ -74,4 +122,11 @@ public class RedeliveryPolicy implements Cloneable, Serializable { public void setUseExponentialBackOff(boolean useExponentialBackOff) { this.useExponentialBackOff = useExponentialBackOff; } + + protected static synchronized void initRandomNumberGenerator() { + if (randomNumberGenerator == null) { + randomNumberGenerator = new Random(); + } + } + }