[AMQ-6847] limit the retry loop to one iteration overa all pending messages such that new additions are not replayed to avoid duplicates

(cherry picked from commit 03b19b9da4)
This commit is contained in:
gtully 2017-11-01 11:26:36 +00:00 committed by Christopher L. Shannon (cshannon)
parent eb9e50f3c9
commit 0464d53233
2 changed files with 14 additions and 6 deletions

View File

@ -1475,8 +1475,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
try {
messages.rollback(m.getMessageId());
if (isDLQ()) {
DeadLetterStrategy stratagy = getDeadLetterStrategy();
stratagy.rollback(m.getMessage());
DeadLetterStrategy strategy = getDeadLetterStrategy();
strategy.rollback(m.getMessage());
}
} finally {
messagesLock.writeLock().unlock();
@ -1560,6 +1560,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
throw new Exception("Retry of message is only possible on Dead Letter Queues!");
}
int restoredCounter = 0;
// ensure we deal with a snapshot to avoid potential duplicates in the event of messages
// getting immediate dlq'ed
long numberOfRetryAttemptsToCheckAllMessagesOnce = this.destinationStatistics.getMessages().getCount();
Set<MessageReference> set = new LinkedHashSet<MessageReference>();
do {
doPageIn(true);
@ -1571,6 +1574,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
numberOfRetryAttemptsToCheckAllMessagesOnce--;
if (ref.getMessage().getOriginalDestination() != null) {
moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination());
@ -1580,7 +1584,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
}
}
} while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
} while (numberOfRetryAttemptsToCheckAllMessagesOnce > 0 && set.size() < this.destinationStatistics.getMessages().getCount());
return restoredCounter;
}

View File

@ -206,22 +206,26 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
}});
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME );
QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
assertTrue("messagees on dlq", Wait.waitFor(new Wait.Condition() {
assertTrue("messages on dlq", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + queue.getQueueSize());
return MESSAGE_COUNT == dlq.getQueueSize();
}
}));
dlq.retryMessages();
assertTrue("messagees on dlq after retry", Wait.waitFor(new Wait.Condition() {
assertTrue("messages on dlq after retry", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Dlq size: " + dlq.getQueueSize());
LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + queue.getQueueSize());
return MESSAGE_COUNT == dlq.getQueueSize();
}
}));