AMQ-8617: RedeliveryPolicy:Exponential Backoff + NonBlockingRedelivery = too long delays

(cherry picked from commit 393a696955)

Scenario on client:

1. Employing RedeliveryPolicy with exponential backoff (keeping maximum
redeliveries at default 6)
2. Enabled non-blocking redelivery
3. Receiving e.g. 100 consecutive poison messages (which eventually
should DLQ after max redeliveries)

This will result in massive redelivery delays due to a logic bug.

The reason is that redeliveryDelay is a field variable kept on the
ActiveMQMessageConsumer, instead of being a property on the message - or
that the redelivery delay was calculated per message based on the
redelivery count.

When consecutive messages rollbacks multiple times, the redeliveryDelay
field is continuously multiplied by the backoff multiplier, resulting in
enormous delays.

Fix: Ditch the field variable, instead calculating the redeliveryDelay
per delivery from the redelivery count. (This happens to be identical to
how it is done in afterRollback() in ActiveMQSession:1004.)

Test is added - which fails with the previous code, and passes with
this. Added a debug log line for the calculated delay.
This commit is contained in:
Endre Stølsvik 2022-05-31 01:56:30 +02:00 committed by Jean-Baptiste Onofré
parent 20dc305a49
commit 2de859f758
2 changed files with 104 additions and 12 deletions

View File

@ -139,7 +139,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private PreviouslyDeliveredMap<MessageId, PreviouslyDelivered> previouslyDeliveredMessages;
private int deliveredCounter;
private int additionalWindowSize;
private long redeliveryDelay;
private int ackCounter;
private int dispatchedCount;
private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
@ -1224,7 +1223,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
deliveredMessages.clear();
clearPreviouslyDelivered();
}
redeliveryDelay = 0;
}
public void rollback() throws JMSException {
@ -1249,14 +1247,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return;
}
// use initial delay for first redelivery
MessageDispatch lastMd = deliveredMessages.getFirst();
final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
if (currentRedeliveryCount > 0) {
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
} else {
redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
}
MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
@ -1279,12 +1270,21 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
session.sendAck(ack,true);
// Adjust the window size.
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
redeliveryDelay = 0;
deliveredCounter -= deliveredMessages.size();
deliveredMessages.clear();
} else {
// Find what redelivery delay to use, based on the redelivery count of last message.
// Current redelivery count is already increased at this point
final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
// Iterating based on redelivery count to find delay to use.
// NOTE: One less than current redelivery count, to use initial delay for first redelivery.
for (int i = 0; i < (currentRedeliveryCount-1); i++) {
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
}
LOG.debug("Redelivery delay calculated for redelivery count {}: {}, for messageId '{}'.", currentRedeliveryCount, redeliveryDelay, lastMd.getMessage().getMessageId());
// only redelivery_ack after first delivery
if (currentRedeliveryCount > 0) {

View File

@ -56,7 +56,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
}
public void testGetNext() throws Exception {
public void testGetNextWithExponentialBackoff() throws Exception {
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(0);
@ -76,6 +76,34 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertEquals(500, delay);
}
public void testGetNextWithExponentialBackoff_RedeliveryDelayIsIgnoredIfInitialRedeliveryDelayAboveZero() {
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(42);
policy.setRedeliveryDelay(-100); // This is ignored in actual usage since initial > 0
policy.setBackOffMultiplier(2d);
policy.setUseExponentialBackOff(true);
// Invoke in the order employed when actually used by redelivery code paths
long delay = policy.getInitialRedeliveryDelay();
assertEquals(42, delay);
// Notice how the setRedeliveryDelay(-100) doesn't affect the calculation if initial > 0
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(42*2, delay);
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(42*4, delay);
// If the initial delay is 0, when given back to the policy via getNextRedeliveryDelay(), we get -100
assertEquals(-100, policy.getNextRedeliveryDelay(0));
// .. but when invoked with anything else, the backoff multiplier is used
assertEquals(123 * 2, policy.getNextRedeliveryDelay(123));
// If exponential backoff is disabled, the setRedeliveryDelay(-100) is used.
policy.setUseExponentialBackOff(false);
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(-100, delay);
}
public void testGetNextWithInitialDelay() throws Exception {
RedeliveryPolicy policy = new RedeliveryPolicy();
@ -87,7 +115,6 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertEquals(1000, delay);
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(1000, delay);
}
/**
@ -145,6 +172,71 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
}
/**
* By version 5.17.1 (2022-04-25), the combination of exponential redelivery with non-blocking redelivery was
* handled erroneously: The redeliveryDelay was a modifiable field on the ActiveMQMessageConsumer (not per message,
* nor calculated individually based on the message's redelivery count), and thus if multiple consecutive messages
* was rolled back multiple times in a row (i.e. redeliveries > 1), the exponential delay <i>which was kept on the
* consumer</i> would quickly result in extreme delays.
*/
public void testExponentialRedeliveryPolicyCombinedWithNonBlockingRedelivery() throws Exception {
// :: ARRANGE
// Condition #1: Create an exponential redelivery policy
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(0);
policy.setRedeliveryDelay(100);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(4); // 5 attempts: 1 delivery + 4 redeliveries
// assert set of delays
long delay = policy.getInitialRedeliveryDelay();
assertEquals(0, delay);
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(100, delay);
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(200, delay);
delay = policy.getNextRedeliveryDelay(delay);
assertEquals(400, delay);
// Condition #2: Set non-blocking redelivery
connection.setNonBlockingRedelivery(true);
// :: ACT
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue(getName());
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
// Send 'count' messages
int count = 10;
for (int i = 0; i<count; i++) {
producer.send(session.createTextMessage("#"+i));
}
session.commit();
LOG.info("{} messages sent", count);
// Receive messages, but rollback: 4+1 times each message = 5 * count
int receiveCount = 0;
// Add one extra receive, which should NOT result in a message (they should all be DLQed by then).
for (int i = 0; i < (count * 5 + 1); i++) {
// Max delay between redeliveries for these messages should be 400ms
// Waiting for 4x that = 1600 ms, to allow for hiccups during testing.
// (With the faulty code, the last calculated delay before test failing was 26214400.)
TextMessage m = (TextMessage) consumer.receive(1600);
LOG.info("Message received: {}", m);
if (m != null) {
receiveCount ++;
}
session.rollback();
}
// ASSERT
// We should have received count * 5 messages
assertEquals(count * 5, receiveCount);
}
/**
* @throws Exception