mirror of https://github.com/apache/activemq.git
AMQ-8617: RedeliveryPolicy:Exponential Backoff + NonBlockingRedelivery = too long delays
Scenario on client: 1. Employing RedeliveryPolicy with exponential backoff (keeping maximum redeliveries at default 6) 2. Enabled non-blocking redelivery 3. Receiving e.g. 100 consecutive poison messages (which eventually should DLQ after max redeliveries) This will result in massive redelivery delays due to a logic bug. The reason is that redeliveryDelay is a field variable kept on the ActiveMQMessageConsumer, instead of being a property on the message - or that the redelivery delay was calculated per message based on the redelivery count. When consecutive messages rollbacks multiple times, the redeliveryDelay field is continuously multiplied by the backoff multiplier, resulting in enormous delays. Fix: Ditch the field variable, instead calculating the redeliveryDelay per delivery from the redelivery count. (This happens to be identical to how it is done in afterRollback() in ActiveMQSession:1004.) Test is added - which fails with the previous code, and passes with this. Added a debug log line for the calculated delay.
This commit is contained in:
parent
9b1eb96b83
commit
393a696955
|
@ -139,7 +139,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
private PreviouslyDeliveredMap<MessageId, PreviouslyDelivered> previouslyDeliveredMessages;
|
private PreviouslyDeliveredMap<MessageId, PreviouslyDelivered> previouslyDeliveredMessages;
|
||||||
private int deliveredCounter;
|
private int deliveredCounter;
|
||||||
private int additionalWindowSize;
|
private int additionalWindowSize;
|
||||||
private long redeliveryDelay;
|
|
||||||
private int ackCounter;
|
private int ackCounter;
|
||||||
private int dispatchedCount;
|
private int dispatchedCount;
|
||||||
private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
|
private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
|
||||||
|
@ -1224,7 +1223,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
deliveredMessages.clear();
|
deliveredMessages.clear();
|
||||||
clearPreviouslyDelivered();
|
clearPreviouslyDelivered();
|
||||||
}
|
}
|
||||||
redeliveryDelay = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void rollback() throws JMSException {
|
public void rollback() throws JMSException {
|
||||||
|
@ -1249,14 +1247,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// use initial delay for first redelivery
|
|
||||||
MessageDispatch lastMd = deliveredMessages.getFirst();
|
MessageDispatch lastMd = deliveredMessages.getFirst();
|
||||||
final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
|
|
||||||
if (currentRedeliveryCount > 0) {
|
|
||||||
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
|
|
||||||
} else {
|
|
||||||
redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
|
|
||||||
}
|
|
||||||
MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
|
MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
|
||||||
|
|
||||||
for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
|
for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
|
||||||
|
@ -1279,12 +1270,21 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
session.sendAck(ack,true);
|
session.sendAck(ack,true);
|
||||||
// Adjust the window size.
|
// Adjust the window size.
|
||||||
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
|
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
|
||||||
redeliveryDelay = 0;
|
|
||||||
|
|
||||||
deliveredCounter -= deliveredMessages.size();
|
deliveredCounter -= deliveredMessages.size();
|
||||||
deliveredMessages.clear();
|
deliveredMessages.clear();
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
// Find what redelivery delay to use, based on the redelivery count of last message.
|
||||||
|
// Current redelivery count is already increased at this point
|
||||||
|
final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
|
||||||
|
long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
|
||||||
|
// Iterating based on redelivery count to find delay to use.
|
||||||
|
// NOTE: One less than current redelivery count, to use initial delay for first redelivery.
|
||||||
|
for (int i = 0; i < (currentRedeliveryCount-1); i++) {
|
||||||
|
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
|
||||||
|
}
|
||||||
|
LOG.debug("Redelivery delay calculated for redelivery count {}: {}, for messageId '{}'.", currentRedeliveryCount, redeliveryDelay, lastMd.getMessage().getMessageId());
|
||||||
|
|
||||||
// only redelivery_ack after first delivery
|
// only redelivery_ack after first delivery
|
||||||
if (currentRedeliveryCount > 0) {
|
if (currentRedeliveryCount > 0) {
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void testGetNext() throws Exception {
|
public void testGetNextWithExponentialBackoff() throws Exception {
|
||||||
|
|
||||||
RedeliveryPolicy policy = new RedeliveryPolicy();
|
RedeliveryPolicy policy = new RedeliveryPolicy();
|
||||||
policy.setInitialRedeliveryDelay(0);
|
policy.setInitialRedeliveryDelay(0);
|
||||||
|
@ -76,6 +76,34 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
||||||
assertEquals(500, delay);
|
assertEquals(500, delay);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testGetNextWithExponentialBackoff_RedeliveryDelayIsIgnoredIfInitialRedeliveryDelayAboveZero() {
|
||||||
|
|
||||||
|
RedeliveryPolicy policy = new RedeliveryPolicy();
|
||||||
|
policy.setInitialRedeliveryDelay(42);
|
||||||
|
policy.setRedeliveryDelay(-100); // This is ignored in actual usage since initial > 0
|
||||||
|
policy.setBackOffMultiplier(2d);
|
||||||
|
policy.setUseExponentialBackOff(true);
|
||||||
|
|
||||||
|
// Invoke in the order employed when actually used by redelivery code paths
|
||||||
|
long delay = policy.getInitialRedeliveryDelay();
|
||||||
|
assertEquals(42, delay);
|
||||||
|
// Notice how the setRedeliveryDelay(-100) doesn't affect the calculation if initial > 0
|
||||||
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
|
assertEquals(42*2, delay);
|
||||||
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
|
assertEquals(42*4, delay);
|
||||||
|
|
||||||
|
// If the initial delay is 0, when given back to the policy via getNextRedeliveryDelay(), we get -100
|
||||||
|
assertEquals(-100, policy.getNextRedeliveryDelay(0));
|
||||||
|
// .. but when invoked with anything else, the backoff multiplier is used
|
||||||
|
assertEquals(123 * 2, policy.getNextRedeliveryDelay(123));
|
||||||
|
|
||||||
|
// If exponential backoff is disabled, the setRedeliveryDelay(-100) is used.
|
||||||
|
policy.setUseExponentialBackOff(false);
|
||||||
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
|
assertEquals(-100, delay);
|
||||||
|
}
|
||||||
|
|
||||||
public void testGetNextWithInitialDelay() throws Exception {
|
public void testGetNextWithInitialDelay() throws Exception {
|
||||||
|
|
||||||
RedeliveryPolicy policy = new RedeliveryPolicy();
|
RedeliveryPolicy policy = new RedeliveryPolicy();
|
||||||
|
@ -87,7 +115,6 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
||||||
assertEquals(1000, delay);
|
assertEquals(1000, delay);
|
||||||
delay = policy.getNextRedeliveryDelay(delay);
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
assertEquals(1000, delay);
|
assertEquals(1000, delay);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -145,6 +172,71 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* By version 5.17.1 (2022-04-25), the combination of exponential redelivery with non-blocking redelivery was
|
||||||
|
* handled erroneously: The redeliveryDelay was a modifiable field on the ActiveMQMessageConsumer (not per message,
|
||||||
|
* nor calculated individually based on the message's redelivery count), and thus if multiple consecutive messages
|
||||||
|
* was rolled back multiple times in a row (i.e. redeliveries > 1), the exponential delay <i>which was kept on the
|
||||||
|
* consumer</i> would quickly result in extreme delays.
|
||||||
|
*/
|
||||||
|
public void testExponentialRedeliveryPolicyCombinedWithNonBlockingRedelivery() throws Exception {
|
||||||
|
// :: ARRANGE
|
||||||
|
// Condition #1: Create an exponential redelivery policy
|
||||||
|
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
||||||
|
policy.setInitialRedeliveryDelay(0);
|
||||||
|
policy.setRedeliveryDelay(100);
|
||||||
|
policy.setBackOffMultiplier(2);
|
||||||
|
policy.setUseExponentialBackOff(true);
|
||||||
|
policy.setMaximumRedeliveries(4); // 5 attempts: 1 delivery + 4 redeliveries
|
||||||
|
|
||||||
|
// assert set of delays
|
||||||
|
long delay = policy.getInitialRedeliveryDelay();
|
||||||
|
assertEquals(0, delay);
|
||||||
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
|
assertEquals(100, delay);
|
||||||
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
|
assertEquals(200, delay);
|
||||||
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
|
assertEquals(400, delay);
|
||||||
|
|
||||||
|
// Condition #2: Set non-blocking redelivery
|
||||||
|
connection.setNonBlockingRedelivery(true);
|
||||||
|
|
||||||
|
// :: ACT
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
ActiveMQQueue destination = new ActiveMQQueue(getName());
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
// Send 'count' messages
|
||||||
|
int count = 10;
|
||||||
|
for (int i = 0; i<count; i++) {
|
||||||
|
producer.send(session.createTextMessage("#"+i));
|
||||||
|
}
|
||||||
|
session.commit();
|
||||||
|
LOG.info("{} messages sent", count);
|
||||||
|
|
||||||
|
// Receive messages, but rollback: 4+1 times each message = 5 * count
|
||||||
|
int receiveCount = 0;
|
||||||
|
// Add one extra receive, which should NOT result in a message (they should all be DLQed by then).
|
||||||
|
for (int i = 0; i < (count * 5 + 1); i++) {
|
||||||
|
// Max delay between redeliveries for these messages should be 400ms
|
||||||
|
// Waiting for 4x that = 1600 ms, to allow for hiccups during testing.
|
||||||
|
// (With the faulty code, the last calculated delay before test failing was 26214400.)
|
||||||
|
TextMessage m = (TextMessage) consumer.receive(1600);
|
||||||
|
LOG.info("Message received: {}", m);
|
||||||
|
if (m != null) {
|
||||||
|
receiveCount ++;
|
||||||
|
}
|
||||||
|
session.rollback();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
// We should have received count * 5 messages
|
||||||
|
assertEquals(count * 5, receiveCount);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
|
|
Loading…
Reference in New Issue