From 03b19b9da4d50c3bb8985f930e93596c7d994d26 Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 1 Nov 2017 11:26:36 +0000 Subject: [PATCH] [AMQ-6847] limit the retry loop to one iteration overa all pending messages such that new additions are not replayed to avoid duplicates --- .../java/org/apache/activemq/broker/region/Queue.java | 10 +++++++--- .../java/org/apache/activemq/broker/jmx/MBeanTest.java | 10 +++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 4a2d272fc8..04ef3fd22f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1484,8 +1484,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index try { messages.rollback(m.getMessageId()); if (isDLQ()) { - DeadLetterStrategy stratagy = getDeadLetterStrategy(); - stratagy.rollback(m.getMessage()); + DeadLetterStrategy strategy = getDeadLetterStrategy(); + strategy.rollback(m.getMessage()); } } finally { messagesLock.writeLock().unlock(); @@ -1569,6 +1569,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index throw new Exception("Retry of message is only possible on Dead Letter Queues!"); } int restoredCounter = 0; + // ensure we deal with a snapshot to avoid potential duplicates in the event of messages + // getting immediate dlq'ed + long numberOfRetryAttemptsToCheckAllMessagesOnce = this.destinationStatistics.getMessages().getCount(); Set set = new LinkedHashSet(); do { doPageIn(true); @@ -1580,6 +1583,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } List list = new ArrayList(set); for (MessageReference ref : list) { + numberOfRetryAttemptsToCheckAllMessagesOnce--; if (ref.getMessage().getOriginalDestination() != null) { moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination()); @@ -1589,7 +1593,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } } - } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages); + } while (numberOfRetryAttemptsToCheckAllMessagesOnce > 0 && set.size() < this.destinationStatistics.getMessages().getCount()); return restoredCounter; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index ecc689445d..0ccf1cb478 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -206,22 +206,26 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { }}); + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME ); QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true); - assertTrue("messagees on dlq", Wait.waitFor(new Wait.Condition() { + assertTrue("messages on dlq", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { + LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + queue.getQueueSize()); return MESSAGE_COUNT == dlq.getQueueSize(); } })); dlq.retryMessages(); - assertTrue("messagees on dlq after retry", Wait.waitFor(new Wait.Condition() { + assertTrue("messages on dlq after retry", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - LOG.info("Dlq size: " + dlq.getQueueSize()); + LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + queue.getQueueSize()); return MESSAGE_COUNT == dlq.getQueueSize(); } }));