[AMQ-6847] limit the retry loop to one iteration overa all pending messages such that new additions are not replayed to avoid duplicates

This commit is contained in:
gtully 2017-11-01 11:26:36 +00:00
parent 2ea5d1420b
commit 03b19b9da4
2 changed files with 14 additions and 6 deletions

View File

@ -1484,8 +1484,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
try { try {
messages.rollback(m.getMessageId()); messages.rollback(m.getMessageId());
if (isDLQ()) { if (isDLQ()) {
DeadLetterStrategy stratagy = getDeadLetterStrategy(); DeadLetterStrategy strategy = getDeadLetterStrategy();
stratagy.rollback(m.getMessage()); strategy.rollback(m.getMessage());
} }
} finally { } finally {
messagesLock.writeLock().unlock(); messagesLock.writeLock().unlock();
@ -1569,6 +1569,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
throw new Exception("Retry of message is only possible on Dead Letter Queues!"); throw new Exception("Retry of message is only possible on Dead Letter Queues!");
} }
int restoredCounter = 0; int restoredCounter = 0;
// ensure we deal with a snapshot to avoid potential duplicates in the event of messages
// getting immediate dlq'ed
long numberOfRetryAttemptsToCheckAllMessagesOnce = this.destinationStatistics.getMessages().getCount();
Set<MessageReference> set = new LinkedHashSet<MessageReference>(); Set<MessageReference> set = new LinkedHashSet<MessageReference>();
do { do {
doPageIn(true); doPageIn(true);
@ -1580,6 +1583,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} }
List<MessageReference> list = new ArrayList<MessageReference>(set); List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) { for (MessageReference ref : list) {
numberOfRetryAttemptsToCheckAllMessagesOnce--;
if (ref.getMessage().getOriginalDestination() != null) { if (ref.getMessage().getOriginalDestination() != null) {
moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination()); moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination());
@ -1589,7 +1593,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} }
} }
} }
} while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages); } while (numberOfRetryAttemptsToCheckAllMessagesOnce > 0 && set.size() < this.destinationStatistics.getMessages().getCount());
return restoredCounter; return restoredCounter;
} }

View File

@ -206,22 +206,26 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
}}); }});
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME ); ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME );
QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true); QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
assertTrue("messagees on dlq", Wait.waitFor(new Wait.Condition() { assertTrue("messages on dlq", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + queue.getQueueSize());
return MESSAGE_COUNT == dlq.getQueueSize(); return MESSAGE_COUNT == dlq.getQueueSize();
} }
})); }));
dlq.retryMessages(); dlq.retryMessages();
assertTrue("messagees on dlq after retry", Wait.waitFor(new Wait.Condition() { assertTrue("messages on dlq after retry", Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("Dlq size: " + dlq.getQueueSize()); LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + queue.getQueueSize());
return MESSAGE_COUNT == dlq.getQueueSize(); return MESSAGE_COUNT == dlq.getQueueSize();
} }
})); }));