mirror of https://github.com/apache/activemq.git
resolve: fix https://issues.apache.org/activemq/browse/AMQ-1847 - set initialRedeliveryDelay=0 to get non delayed first redelivery, use deliveryDelay to set the base for subsequent redelivery. existing users with an initialRedeliveryDelay set will now see it being respected.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@982903 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6b81869fc6
commit
c23f9e6584
|
@ -1099,11 +1099,13 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only increase the redelivery delay after the first redelivery..
|
// use initial delay for first redelivery
|
||||||
MessageDispatch lastMd = deliveredMessages.getFirst();
|
MessageDispatch lastMd = deliveredMessages.getFirst();
|
||||||
final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
|
final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
|
||||||
if (currentRedeliveryCount > 0) {
|
if (currentRedeliveryCount > 0) {
|
||||||
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
|
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
|
||||||
|
} else {
|
||||||
|
redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
|
||||||
}
|
}
|
||||||
MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
|
MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
|
||||||
|
|
||||||
|
|
|
@ -874,9 +874,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
||||||
|
|
||||||
// Figure out how long we should wait to resend
|
// Figure out how long we should wait to resend
|
||||||
// this message.
|
// this message.
|
||||||
long redeliveryDelay = 0;
|
long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
|
||||||
for (int i = 0; i < redeliveryCounter; i++) {
|
for (int i = 0; i < redeliveryCounter; i++) {
|
||||||
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
|
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
|
||||||
}
|
}
|
||||||
scheduler.executeAfterDelay(new Runnable() {
|
scheduler.executeAfterDelay(new Runnable() {
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,7 @@ public class RedeliveryPolicy implements Cloneable, Serializable {
|
||||||
private boolean useCollisionAvoidance;
|
private boolean useCollisionAvoidance;
|
||||||
private boolean useExponentialBackOff;
|
private boolean useExponentialBackOff;
|
||||||
private double backOffMultiplier = 5.0;
|
private double backOffMultiplier = 5.0;
|
||||||
|
private long redeliveryDelay = initialRedeliveryDelay;
|
||||||
|
|
||||||
public RedeliveryPolicy() {
|
public RedeliveryPolicy() {
|
||||||
}
|
}
|
||||||
|
@ -82,15 +83,15 @@ public class RedeliveryPolicy implements Cloneable, Serializable {
|
||||||
this.maximumRedeliveries = maximumRedeliveries;
|
this.maximumRedeliveries = maximumRedeliveries;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getRedeliveryDelay(long previousDelay) {
|
public long getNextRedeliveryDelay(long previousDelay) {
|
||||||
long redeliveryDelay;
|
long nextDelay;
|
||||||
|
|
||||||
if (previousDelay == 0) {
|
if (previousDelay == 0) {
|
||||||
redeliveryDelay = initialRedeliveryDelay;
|
nextDelay = redeliveryDelay;
|
||||||
} else if (useExponentialBackOff && backOffMultiplier > 1) {
|
} else if (useExponentialBackOff && backOffMultiplier > 1) {
|
||||||
redeliveryDelay = (long) (previousDelay * backOffMultiplier);
|
nextDelay = (long) (previousDelay * backOffMultiplier);
|
||||||
} else {
|
} else {
|
||||||
redeliveryDelay = previousDelay;
|
nextDelay = previousDelay;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (useCollisionAvoidance) {
|
if (useCollisionAvoidance) {
|
||||||
|
@ -100,10 +101,10 @@ public class RedeliveryPolicy implements Cloneable, Serializable {
|
||||||
*/
|
*/
|
||||||
Random random = getRandomNumberGenerator();
|
Random random = getRandomNumberGenerator();
|
||||||
double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
|
double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
|
||||||
redeliveryDelay += redeliveryDelay * variance;
|
nextDelay += nextDelay * variance;
|
||||||
}
|
}
|
||||||
|
|
||||||
return redeliveryDelay;
|
return nextDelay;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isUseCollisionAvoidance() {
|
public boolean isUseCollisionAvoidance() {
|
||||||
|
@ -129,4 +130,11 @@ public class RedeliveryPolicy implements Cloneable, Serializable {
|
||||||
return randomNumberGenerator;
|
return randomNumberGenerator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRedeliveryDelay(long redeliveryDelay) {
|
||||||
|
this.redeliveryDelay = redeliveryDelay;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getRedeliveryDelay() {
|
||||||
|
return redeliveryDelay;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,8 @@ public class MessageListenerRedeliveryTest extends TestCase {
|
||||||
|
|
||||||
protected RedeliveryPolicy getRedeliveryPolicy() {
|
protected RedeliveryPolicy getRedeliveryPolicy() {
|
||||||
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
|
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
|
||||||
redeliveryPolicy.setInitialRedeliveryDelay(1000);
|
redeliveryPolicy.setInitialRedeliveryDelay(0);
|
||||||
|
redeliveryPolicy.setRedeliveryDelay(1000);
|
||||||
redeliveryPolicy.setMaximumRedeliveries(3);
|
redeliveryPolicy.setMaximumRedeliveries(3);
|
||||||
redeliveryPolicy.setBackOffMultiplier((short)2);
|
redeliveryPolicy.setBackOffMultiplier((short)2);
|
||||||
redeliveryPolicy.setUseExponentialBackOff(true);
|
redeliveryPolicy.setUseExponentialBackOff(true);
|
||||||
|
|
|
@ -47,7 +47,8 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
||||||
|
|
||||||
// Receive a message with the JMS API
|
// Receive a message with the JMS API
|
||||||
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
||||||
policy.setInitialRedeliveryDelay(500);
|
policy.setInitialRedeliveryDelay(0);
|
||||||
|
policy.setRedeliveryDelay(500);
|
||||||
policy.setBackOffMultiplier((short) 2);
|
policy.setBackOffMultiplier((short) 2);
|
||||||
policy.setUseExponentialBackOff(true);
|
policy.setUseExponentialBackOff(true);
|
||||||
|
|
||||||
|
@ -102,8 +103,9 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
||||||
|
|
||||||
// Receive a message with the JMS API
|
// Receive a message with the JMS API
|
||||||
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
||||||
policy.setInitialRedeliveryDelay(500);
|
policy.setInitialRedeliveryDelay(0);
|
||||||
|
policy.setRedeliveryDelay(500);
|
||||||
|
|
||||||
connection.start();
|
connection.start();
|
||||||
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||||
ActiveMQQueue destination = new ActiveMQQueue(getName());
|
ActiveMQQueue destination = new ActiveMQQueue(getName());
|
||||||
|
@ -303,5 +305,128 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testInitialRedeliveryDelayZero() throws Exception {
|
||||||
|
|
||||||
|
// Receive a message with the JMS API
|
||||||
|
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
||||||
|
policy.setInitialRedeliveryDelay(0);
|
||||||
|
policy.setUseExponentialBackOff(false);
|
||||||
|
policy.setMaximumRedeliveries(1);
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
ActiveMQQueue destination = new ActiveMQQueue("TEST");
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
// Send the messages
|
||||||
|
producer.send(session.createTextMessage("1st"));
|
||||||
|
producer.send(session.createTextMessage("2nd"));
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
TextMessage m;
|
||||||
|
m = (TextMessage)consumer.receive(100);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(100);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(100);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("2nd", m.getText());
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testInitialRedeliveryDelayOne() throws Exception {
|
||||||
|
|
||||||
|
// Receive a message with the JMS API
|
||||||
|
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
||||||
|
policy.setInitialRedeliveryDelay(1000);
|
||||||
|
policy.setUseExponentialBackOff(false);
|
||||||
|
policy.setMaximumRedeliveries(1);
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
ActiveMQQueue destination = new ActiveMQQueue("TEST");
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
// Send the messages
|
||||||
|
producer.send(session.createTextMessage("1st"));
|
||||||
|
producer.send(session.createTextMessage("2nd"));
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
TextMessage m;
|
||||||
|
m = (TextMessage)consumer.receive(100);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(100);
|
||||||
|
assertNull(m);
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(2000);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(100);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("2nd", m.getText());
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRedeliveryDelayOne() throws Exception {
|
||||||
|
|
||||||
|
// Receive a message with the JMS API
|
||||||
|
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
||||||
|
policy.setInitialRedeliveryDelay(0);
|
||||||
|
policy.setRedeliveryDelay(1000);
|
||||||
|
policy.setUseExponentialBackOff(false);
|
||||||
|
policy.setMaximumRedeliveries(2);
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
ActiveMQQueue destination = new ActiveMQQueue("TEST");
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
// Send the messages
|
||||||
|
producer.send(session.createTextMessage("1st"));
|
||||||
|
producer.send(session.createTextMessage("2nd"));
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
TextMessage m;
|
||||||
|
m = (TextMessage)consumer.receive(100);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(100);
|
||||||
|
assertNotNull("first immediate redelivery", m);
|
||||||
|
session.rollback();
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(100);
|
||||||
|
assertNull("second delivery delayed: " + m, m);
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(2000);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("1st", m.getText());
|
||||||
|
|
||||||
|
m = (TextMessage)consumer.receive(100);
|
||||||
|
assertNotNull(m);
|
||||||
|
assertEquals("2nd", m.getText());
|
||||||
|
session.commit();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class AMQ2021Test extends TestCase implements ExceptionListener, Uncaught
|
||||||
AMQ2021Test testCase;
|
AMQ2021Test testCase;
|
||||||
|
|
||||||
String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616";
|
String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616";
|
||||||
String ACTIVEMQ_BROKER_URL = ACTIVEMQ_BROKER_BIND + "?jms.redeliveryPolicy.maximumRedeliveries=1";
|
String ACTIVEMQ_BROKER_URL = ACTIVEMQ_BROKER_BIND + "?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0";
|
||||||
|
|
||||||
private int numMessages = 1000;
|
private int numMessages = 1000;
|
||||||
private int numConsumers = 2;
|
private int numConsumers = 2;
|
||||||
|
|
Loading…
Reference in New Issue