mirror of https://github.com/apache/activemq.git
Merge pull request #843 from stolsvik/main
AMQ-8617: RedeliveryPolicy:Exponential Backoff + NonBlockingRedelivery = too long delays
This commit is contained in:
commit
9852cf1f49
|
@ -139,7 +139,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
private PreviouslyDeliveredMap<MessageId, PreviouslyDelivered> previouslyDeliveredMessages;
|
private PreviouslyDeliveredMap<MessageId, PreviouslyDelivered> previouslyDeliveredMessages;
|
||||||
private int deliveredCounter;
|
private int deliveredCounter;
|
||||||
private int additionalWindowSize;
|
private int additionalWindowSize;
|
||||||
private long redeliveryDelay;
|
|
||||||
private int ackCounter;
|
private int ackCounter;
|
||||||
private int dispatchedCount;
|
private int dispatchedCount;
|
||||||
private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
|
private final AtomicReference<MessageListener> messageListener = new AtomicReference<MessageListener>();
|
||||||
|
@ -1224,7 +1223,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
deliveredMessages.clear();
|
deliveredMessages.clear();
|
||||||
clearPreviouslyDelivered();
|
clearPreviouslyDelivered();
|
||||||
}
|
}
|
||||||
redeliveryDelay = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void rollback() throws JMSException {
|
public void rollback() throws JMSException {
|
||||||
|
@ -1249,14 +1247,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// use initial delay for first redelivery
|
|
||||||
MessageDispatch lastMd = deliveredMessages.getFirst();
|
MessageDispatch lastMd = deliveredMessages.getFirst();
|
||||||
final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
|
|
||||||
if (currentRedeliveryCount > 0) {
|
|
||||||
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
|
|
||||||
} else {
|
|
||||||
redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
|
|
||||||
}
|
|
||||||
MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
|
MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
|
||||||
|
|
||||||
for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
|
for (Iterator<MessageDispatch> iter = deliveredMessages.iterator(); iter.hasNext();) {
|
||||||
|
@ -1279,12 +1270,21 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
||||||
session.sendAck(ack,true);
|
session.sendAck(ack,true);
|
||||||
// Adjust the window size.
|
// Adjust the window size.
|
||||||
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
|
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
|
||||||
redeliveryDelay = 0;
|
|
||||||
|
|
||||||
deliveredCounter -= deliveredMessages.size();
|
deliveredCounter -= deliveredMessages.size();
|
||||||
deliveredMessages.clear();
|
deliveredMessages.clear();
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
// Find what redelivery delay to use, based on the redelivery count of last message.
|
||||||
|
// Current redelivery count is already increased at this point
|
||||||
|
final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
|
||||||
|
long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
|
||||||
|
// Iterating based on redelivery count to find delay to use.
|
||||||
|
// NOTE: One less than current redelivery count, to use initial delay for first redelivery.
|
||||||
|
for (int i = 0; i < (currentRedeliveryCount-1); i++) {
|
||||||
|
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
|
||||||
|
}
|
||||||
|
LOG.debug("Redelivery delay calculated for redelivery count {}: {}, for messageId '{}'.", currentRedeliveryCount, redeliveryDelay, lastMd.getMessage().getMessageId());
|
||||||
|
|
||||||
// only redelivery_ack after first delivery
|
// only redelivery_ack after first delivery
|
||||||
if (currentRedeliveryCount > 0) {
|
if (currentRedeliveryCount > 0) {
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void testGetNext() throws Exception {
|
public void testGetNextWithExponentialBackoff() throws Exception {
|
||||||
|
|
||||||
RedeliveryPolicy policy = new RedeliveryPolicy();
|
RedeliveryPolicy policy = new RedeliveryPolicy();
|
||||||
policy.setInitialRedeliveryDelay(0);
|
policy.setInitialRedeliveryDelay(0);
|
||||||
|
@ -76,6 +76,34 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
||||||
assertEquals(500, delay);
|
assertEquals(500, delay);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testGetNextWithExponentialBackoff_RedeliveryDelayIsIgnoredIfInitialRedeliveryDelayAboveZero() {
|
||||||
|
|
||||||
|
RedeliveryPolicy policy = new RedeliveryPolicy();
|
||||||
|
policy.setInitialRedeliveryDelay(42);
|
||||||
|
policy.setRedeliveryDelay(-100); // This is ignored in actual usage since initial > 0
|
||||||
|
policy.setBackOffMultiplier(2d);
|
||||||
|
policy.setUseExponentialBackOff(true);
|
||||||
|
|
||||||
|
// Invoke in the order employed when actually used by redelivery code paths
|
||||||
|
long delay = policy.getInitialRedeliveryDelay();
|
||||||
|
assertEquals(42, delay);
|
||||||
|
// Notice how the setRedeliveryDelay(-100) doesn't affect the calculation if initial > 0
|
||||||
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
|
assertEquals(42*2, delay);
|
||||||
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
|
assertEquals(42*4, delay);
|
||||||
|
|
||||||
|
// If the initial delay is 0, when given back to the policy via getNextRedeliveryDelay(), we get -100
|
||||||
|
assertEquals(-100, policy.getNextRedeliveryDelay(0));
|
||||||
|
// .. but when invoked with anything else, the backoff multiplier is used
|
||||||
|
assertEquals(123 * 2, policy.getNextRedeliveryDelay(123));
|
||||||
|
|
||||||
|
// If exponential backoff is disabled, the setRedeliveryDelay(-100) is used.
|
||||||
|
policy.setUseExponentialBackOff(false);
|
||||||
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
|
assertEquals(-100, delay);
|
||||||
|
}
|
||||||
|
|
||||||
public void testGetNextWithInitialDelay() throws Exception {
|
public void testGetNextWithInitialDelay() throws Exception {
|
||||||
|
|
||||||
RedeliveryPolicy policy = new RedeliveryPolicy();
|
RedeliveryPolicy policy = new RedeliveryPolicy();
|
||||||
|
@ -87,7 +115,6 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
||||||
assertEquals(1000, delay);
|
assertEquals(1000, delay);
|
||||||
delay = policy.getNextRedeliveryDelay(delay);
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
assertEquals(1000, delay);
|
assertEquals(1000, delay);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -145,6 +172,71 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* By version 5.17.1 (2022-04-25), the combination of exponential redelivery with non-blocking redelivery was
|
||||||
|
* handled erroneously: The redeliveryDelay was a modifiable field on the ActiveMQMessageConsumer (not per message,
|
||||||
|
* nor calculated individually based on the message's redelivery count), and thus if multiple consecutive messages
|
||||||
|
* was rolled back multiple times in a row (i.e. redeliveries > 1), the exponential delay <i>which was kept on the
|
||||||
|
* consumer</i> would quickly result in extreme delays.
|
||||||
|
*/
|
||||||
|
public void testExponentialRedeliveryPolicyCombinedWithNonBlockingRedelivery() throws Exception {
|
||||||
|
// :: ARRANGE
|
||||||
|
// Condition #1: Create an exponential redelivery policy
|
||||||
|
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
|
||||||
|
policy.setInitialRedeliveryDelay(0);
|
||||||
|
policy.setRedeliveryDelay(100);
|
||||||
|
policy.setBackOffMultiplier(2);
|
||||||
|
policy.setUseExponentialBackOff(true);
|
||||||
|
policy.setMaximumRedeliveries(4); // 5 attempts: 1 delivery + 4 redeliveries
|
||||||
|
|
||||||
|
// assert set of delays
|
||||||
|
long delay = policy.getInitialRedeliveryDelay();
|
||||||
|
assertEquals(0, delay);
|
||||||
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
|
assertEquals(100, delay);
|
||||||
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
|
assertEquals(200, delay);
|
||||||
|
delay = policy.getNextRedeliveryDelay(delay);
|
||||||
|
assertEquals(400, delay);
|
||||||
|
|
||||||
|
// Condition #2: Set non-blocking redelivery
|
||||||
|
connection.setNonBlockingRedelivery(true);
|
||||||
|
|
||||||
|
// :: ACT
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
ActiveMQQueue destination = new ActiveMQQueue(getName());
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
// Send 'count' messages
|
||||||
|
int count = 10;
|
||||||
|
for (int i = 0; i<count; i++) {
|
||||||
|
producer.send(session.createTextMessage("#"+i));
|
||||||
|
}
|
||||||
|
session.commit();
|
||||||
|
LOG.info("{} messages sent", count);
|
||||||
|
|
||||||
|
// Receive messages, but rollback: 4+1 times each message = 5 * count
|
||||||
|
int receiveCount = 0;
|
||||||
|
// Add one extra receive, which should NOT result in a message (they should all be DLQed by then).
|
||||||
|
for (int i = 0; i < (count * 5 + 1); i++) {
|
||||||
|
// Max delay between redeliveries for these messages should be 400ms
|
||||||
|
// Waiting for 4x that = 1600 ms, to allow for hiccups during testing.
|
||||||
|
// (With the faulty code, the last calculated delay before test failing was 26214400.)
|
||||||
|
TextMessage m = (TextMessage) consumer.receive(1600);
|
||||||
|
LOG.info("Message received: {}", m);
|
||||||
|
if (m != null) {
|
||||||
|
receiveCount ++;
|
||||||
|
}
|
||||||
|
session.rollback();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
// We should have received count * 5 messages
|
||||||
|
assertEquals(count * 5, receiveCount);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
|
|
Loading…
Reference in New Issue