From 393a696955cbf97b90576e4a85b3ce1a02268ad7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20St=C3=B8lsvik?= Date: Tue, 31 May 2022 01:56:30 +0200 Subject: [PATCH] AMQ-8617: RedeliveryPolicy:Exponential Backoff + NonBlockingRedelivery = too long delays Scenario on client: 1. Employing RedeliveryPolicy with exponential backoff (keeping maximum redeliveries at default 6) 2. Enabled non-blocking redelivery 3. Receiving e.g. 100 consecutive poison messages (which eventually should DLQ after max redeliveries) This will result in massive redelivery delays due to a logic bug. The reason is that redeliveryDelay is a field variable kept on the ActiveMQMessageConsumer, instead of being a property on the message - or that the redelivery delay was calculated per message based on the redelivery count. When consecutive messages rollbacks multiple times, the redeliveryDelay field is continuously multiplied by the backoff multiplier, resulting in enormous delays. Fix: Ditch the field variable, instead calculating the redeliveryDelay per delivery from the redelivery count. (This happens to be identical to how it is done in afterRollback() in ActiveMQSession:1004.) Test is added - which fails with the previous code, and passes with this. Added a debug log line for the calculated delay. --- .../activemq/ActiveMQMessageConsumer.java | 20 ++-- .../apache/activemq/RedeliveryPolicyTest.java | 96 ++++++++++++++++++- 2 files changed, 104 insertions(+), 12 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 533c6234eb..f8ba61d41d 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -139,7 +139,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private PreviouslyDeliveredMap previouslyDeliveredMessages; private int deliveredCounter; private int additionalWindowSize; - private long redeliveryDelay; private int ackCounter; private int dispatchedCount; private final AtomicReference messageListener = new AtomicReference(); @@ -1224,7 +1223,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC deliveredMessages.clear(); clearPreviouslyDelivered(); } - redeliveryDelay = 0; } public void rollback() throws JMSException { @@ -1249,14 +1247,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC return; } - // use initial delay for first redelivery MessageDispatch lastMd = deliveredMessages.getFirst(); - final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter(); - if (currentRedeliveryCount > 0) { - redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); - } else { - redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); - } MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId(); for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { @@ -1279,12 +1270,21 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC session.sendAck(ack,true); // Adjust the window size. additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); - redeliveryDelay = 0; deliveredCounter -= deliveredMessages.size(); deliveredMessages.clear(); } else { + // Find what redelivery delay to use, based on the redelivery count of last message. + // Current redelivery count is already increased at this point + final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter(); + long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); + // Iterating based on redelivery count to find delay to use. + // NOTE: One less than current redelivery count, to use initial delay for first redelivery. + for (int i = 0; i < (currentRedeliveryCount-1); i++) { + redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); + } + LOG.debug("Redelivery delay calculated for redelivery count {}: {}, for messageId '{}'.", currentRedeliveryCount, redeliveryDelay, lastMd.getMessage().getMessageId()); // only redelivery_ack after first delivery if (currentRedeliveryCount > 0) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java index 5f325a4918..85bdc312aa 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -56,7 +56,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport { } - public void testGetNext() throws Exception { + public void testGetNextWithExponentialBackoff() throws Exception { RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(0); @@ -76,6 +76,34 @@ public class RedeliveryPolicyTest extends JmsTestSupport { assertEquals(500, delay); } + public void testGetNextWithExponentialBackoff_RedeliveryDelayIsIgnoredIfInitialRedeliveryDelayAboveZero() { + + RedeliveryPolicy policy = new RedeliveryPolicy(); + policy.setInitialRedeliveryDelay(42); + policy.setRedeliveryDelay(-100); // This is ignored in actual usage since initial > 0 + policy.setBackOffMultiplier(2d); + policy.setUseExponentialBackOff(true); + + // Invoke in the order employed when actually used by redelivery code paths + long delay = policy.getInitialRedeliveryDelay(); + assertEquals(42, delay); + // Notice how the setRedeliveryDelay(-100) doesn't affect the calculation if initial > 0 + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(42*2, delay); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(42*4, delay); + + // If the initial delay is 0, when given back to the policy via getNextRedeliveryDelay(), we get -100 + assertEquals(-100, policy.getNextRedeliveryDelay(0)); + // .. but when invoked with anything else, the backoff multiplier is used + assertEquals(123 * 2, policy.getNextRedeliveryDelay(123)); + + // If exponential backoff is disabled, the setRedeliveryDelay(-100) is used. + policy.setUseExponentialBackOff(false); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(-100, delay); + } + public void testGetNextWithInitialDelay() throws Exception { RedeliveryPolicy policy = new RedeliveryPolicy(); @@ -87,7 +115,6 @@ public class RedeliveryPolicyTest extends JmsTestSupport { assertEquals(1000, delay); delay = policy.getNextRedeliveryDelay(delay); assertEquals(1000, delay); - } /** @@ -145,6 +172,71 @@ public class RedeliveryPolicyTest extends JmsTestSupport { } + /** + * By version 5.17.1 (2022-04-25), the combination of exponential redelivery with non-blocking redelivery was + * handled erroneously: The redeliveryDelay was a modifiable field on the ActiveMQMessageConsumer (not per message, + * nor calculated individually based on the message's redelivery count), and thus if multiple consecutive messages + * was rolled back multiple times in a row (i.e. redeliveries > 1), the exponential delay which was kept on the + * consumer would quickly result in extreme delays. + */ + public void testExponentialRedeliveryPolicyCombinedWithNonBlockingRedelivery() throws Exception { + // :: ARRANGE + // Condition #1: Create an exponential redelivery policy + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setRedeliveryDelay(100); + policy.setBackOffMultiplier(2); + policy.setUseExponentialBackOff(true); + policy.setMaximumRedeliveries(4); // 5 attempts: 1 delivery + 4 redeliveries + + // assert set of delays + long delay = policy.getInitialRedeliveryDelay(); + assertEquals(0, delay); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(100, delay); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(200, delay); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(400, delay); + + // Condition #2: Set non-blocking redelivery + connection.setNonBlockingRedelivery(true); + + // :: ACT + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue(getName()); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send 'count' messages + int count = 10; + for (int i = 0; i