diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 533c6234eb..f8ba61d41d 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -139,7 +139,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private PreviouslyDeliveredMap previouslyDeliveredMessages; private int deliveredCounter; private int additionalWindowSize; - private long redeliveryDelay; private int ackCounter; private int dispatchedCount; private final AtomicReference messageListener = new AtomicReference(); @@ -1224,7 +1223,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC deliveredMessages.clear(); clearPreviouslyDelivered(); } - redeliveryDelay = 0; } public void rollback() throws JMSException { @@ -1249,14 +1247,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC return; } - // use initial delay for first redelivery MessageDispatch lastMd = deliveredMessages.getFirst(); - final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter(); - if (currentRedeliveryCount > 0) { - redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); - } else { - redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); - } MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId(); for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { @@ -1279,12 +1270,21 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC session.sendAck(ack,true); // Adjust the window size. additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); - redeliveryDelay = 0; deliveredCounter -= deliveredMessages.size(); deliveredMessages.clear(); } else { + // Find what redelivery delay to use, based on the redelivery count of last message. + // Current redelivery count is already increased at this point + final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter(); + long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); + // Iterating based on redelivery count to find delay to use. + // NOTE: One less than current redelivery count, to use initial delay for first redelivery. + for (int i = 0; i < (currentRedeliveryCount-1); i++) { + redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); + } + LOG.debug("Redelivery delay calculated for redelivery count {}: {}, for messageId '{}'.", currentRedeliveryCount, redeliveryDelay, lastMd.getMessage().getMessageId()); // only redelivery_ack after first delivery if (currentRedeliveryCount > 0) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java index 5f325a4918..85bdc312aa 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -56,7 +56,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport { } - public void testGetNext() throws Exception { + public void testGetNextWithExponentialBackoff() throws Exception { RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setInitialRedeliveryDelay(0); @@ -76,6 +76,34 @@ public class RedeliveryPolicyTest extends JmsTestSupport { assertEquals(500, delay); } + public void testGetNextWithExponentialBackoff_RedeliveryDelayIsIgnoredIfInitialRedeliveryDelayAboveZero() { + + RedeliveryPolicy policy = new RedeliveryPolicy(); + policy.setInitialRedeliveryDelay(42); + policy.setRedeliveryDelay(-100); // This is ignored in actual usage since initial > 0 + policy.setBackOffMultiplier(2d); + policy.setUseExponentialBackOff(true); + + // Invoke in the order employed when actually used by redelivery code paths + long delay = policy.getInitialRedeliveryDelay(); + assertEquals(42, delay); + // Notice how the setRedeliveryDelay(-100) doesn't affect the calculation if initial > 0 + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(42*2, delay); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(42*4, delay); + + // If the initial delay is 0, when given back to the policy via getNextRedeliveryDelay(), we get -100 + assertEquals(-100, policy.getNextRedeliveryDelay(0)); + // .. but when invoked with anything else, the backoff multiplier is used + assertEquals(123 * 2, policy.getNextRedeliveryDelay(123)); + + // If exponential backoff is disabled, the setRedeliveryDelay(-100) is used. + policy.setUseExponentialBackOff(false); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(-100, delay); + } + public void testGetNextWithInitialDelay() throws Exception { RedeliveryPolicy policy = new RedeliveryPolicy(); @@ -87,7 +115,6 @@ public class RedeliveryPolicyTest extends JmsTestSupport { assertEquals(1000, delay); delay = policy.getNextRedeliveryDelay(delay); assertEquals(1000, delay); - } /** @@ -145,6 +172,71 @@ public class RedeliveryPolicyTest extends JmsTestSupport { } + /** + * By version 5.17.1 (2022-04-25), the combination of exponential redelivery with non-blocking redelivery was + * handled erroneously: The redeliveryDelay was a modifiable field on the ActiveMQMessageConsumer (not per message, + * nor calculated individually based on the message's redelivery count), and thus if multiple consecutive messages + * was rolled back multiple times in a row (i.e. redeliveries > 1), the exponential delay which was kept on the + * consumer would quickly result in extreme delays. + */ + public void testExponentialRedeliveryPolicyCombinedWithNonBlockingRedelivery() throws Exception { + // :: ARRANGE + // Condition #1: Create an exponential redelivery policy + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setRedeliveryDelay(100); + policy.setBackOffMultiplier(2); + policy.setUseExponentialBackOff(true); + policy.setMaximumRedeliveries(4); // 5 attempts: 1 delivery + 4 redeliveries + + // assert set of delays + long delay = policy.getInitialRedeliveryDelay(); + assertEquals(0, delay); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(100, delay); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(200, delay); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(400, delay); + + // Condition #2: Set non-blocking redelivery + connection.setNonBlockingRedelivery(true); + + // :: ACT + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue(getName()); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send 'count' messages + int count = 10; + for (int i = 0; i