mirror of https://github.com/apache/activemq.git
applied patch from Christopher G. Stach II for AMQ-747 to allow redelivery backoff to add a collision avoidence capability. Many thanks!
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@415300 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
87dcdce239
commit
4c4f86a0d8
|
@ -754,12 +754,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
// stop the delivery of messages.
|
// stop the delivery of messages.
|
||||||
unconsumedMessages.stop();
|
unconsumedMessages.stop();
|
||||||
// Start up the delivery again a little later.
|
// Start up the delivery again a little later.
|
||||||
if(redeliveryDelay==0){
|
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
|
||||||
redeliveryDelay=redeliveryPolicy.getInitialRedeliveryDelay();
|
|
||||||
}else{
|
|
||||||
if(redeliveryPolicy.isUseExponentialBackOff())
|
|
||||||
redeliveryDelay*=redeliveryPolicy.getBackOffMultiplier();
|
|
||||||
}
|
|
||||||
Scheduler.executeAfterDelay(new Runnable(){
|
Scheduler.executeAfterDelay(new Runnable(){
|
||||||
public void run(){
|
public void run(){
|
||||||
try{
|
try{
|
||||||
|
|
|
@ -724,12 +724,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
// Figure out how long we should wait to resend this message.
|
// Figure out how long we should wait to resend this message.
|
||||||
long redeliveryDelay=0;
|
long redeliveryDelay=0;
|
||||||
for( int i=0; i < redeliveryCounter; i++) {
|
for( int i=0; i < redeliveryCounter; i++) {
|
||||||
if (redeliveryDelay == 0) {
|
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
|
||||||
redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
|
|
||||||
} else {
|
|
||||||
if (redeliveryPolicy.isUseExponentialBackOff())
|
|
||||||
redeliveryDelay *= redeliveryPolicy.getBackOffMultiplier();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Scheduler.executeAfterDelay(new Runnable() {
|
Scheduler.executeAfterDelay(new Runnable() {
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq;
|
package org.apache.activemq;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configuration options used to control how messages are re-delivered when they
|
* Configuration options used to control how messages are re-delivered when they
|
||||||
|
@ -26,8 +27,12 @@ import java.io.Serializable;
|
||||||
*/
|
*/
|
||||||
public class RedeliveryPolicy implements Cloneable, Serializable {
|
public class RedeliveryPolicy implements Cloneable, Serializable {
|
||||||
|
|
||||||
|
// +/-15% for a 30% spread -cgs
|
||||||
|
protected double collisionAvoidanceFactor = 0.15d;
|
||||||
protected int maximumRedeliveries = 5;
|
protected int maximumRedeliveries = 5;
|
||||||
protected long initialRedeliveryDelay = 1000L;
|
protected long initialRedeliveryDelay = 1000L;
|
||||||
|
protected static Random randomNumberGenerator;
|
||||||
|
protected boolean useCollisionAvoidance = false;
|
||||||
protected boolean useExponentialBackOff = false;
|
protected boolean useExponentialBackOff = false;
|
||||||
protected short backOffMultiplier = 5;
|
protected short backOffMultiplier = 5;
|
||||||
|
|
||||||
|
@ -51,6 +56,14 @@ public class RedeliveryPolicy implements Cloneable, Serializable {
|
||||||
this.backOffMultiplier = backOffMultiplier;
|
this.backOffMultiplier = backOffMultiplier;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public short getCollisionAvoidancePercent() {
|
||||||
|
return (short) Math.round(collisionAvoidanceFactor * 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCollisionAvoidancePercent(short collisionAvoidancePercent) {
|
||||||
|
this.collisionAvoidanceFactor = collisionAvoidancePercent * 0.01d;
|
||||||
|
}
|
||||||
|
|
||||||
public long getInitialRedeliveryDelay() {
|
public long getInitialRedeliveryDelay() {
|
||||||
return initialRedeliveryDelay;
|
return initialRedeliveryDelay;
|
||||||
}
|
}
|
||||||
|
@ -67,6 +80,41 @@ public class RedeliveryPolicy implements Cloneable, Serializable {
|
||||||
this.maximumRedeliveries = maximumRedeliveries;
|
this.maximumRedeliveries = maximumRedeliveries;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getRedeliveryDelay(long previousDelay) {
|
||||||
|
long redeliveryDelay;
|
||||||
|
|
||||||
|
if (previousDelay == 0) {
|
||||||
|
redeliveryDelay = initialRedeliveryDelay;
|
||||||
|
} else if (useExponentialBackOff && backOffMultiplier > 1) {
|
||||||
|
redeliveryDelay = previousDelay * backOffMultiplier;
|
||||||
|
} else {
|
||||||
|
redeliveryDelay = previousDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (useCollisionAvoidance) {
|
||||||
|
if (randomNumberGenerator == null) {
|
||||||
|
initRandomNumberGenerator();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* First random determines +/-, second random determines how far to
|
||||||
|
* go in that direction. -cgs
|
||||||
|
*/
|
||||||
|
double variance = (randomNumberGenerator.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * randomNumberGenerator.nextDouble();
|
||||||
|
redeliveryDelay += redeliveryDelay * variance;
|
||||||
|
}
|
||||||
|
|
||||||
|
return redeliveryDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isUseCollisionAvoidance() {
|
||||||
|
return useCollisionAvoidance;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUseCollisionAvoidance(boolean useCollisionAvoidance) {
|
||||||
|
this.useCollisionAvoidance = useCollisionAvoidance;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isUseExponentialBackOff() {
|
public boolean isUseExponentialBackOff() {
|
||||||
return useExponentialBackOff;
|
return useExponentialBackOff;
|
||||||
}
|
}
|
||||||
|
@ -74,4 +122,11 @@ public class RedeliveryPolicy implements Cloneable, Serializable {
|
||||||
public void setUseExponentialBackOff(boolean useExponentialBackOff) {
|
public void setUseExponentialBackOff(boolean useExponentialBackOff) {
|
||||||
this.useExponentialBackOff = useExponentialBackOff;
|
this.useExponentialBackOff = useExponentialBackOff;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static synchronized void initRandomNumberGenerator() {
|
||||||
|
if (randomNumberGenerator == null) {
|
||||||
|
randomNumberGenerator = new Random();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue