diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index ac206e30ad..48cbfbe5ab 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1475,8 +1475,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index try { messages.rollback(m.getMessageId()); if (isDLQ()) { - DeadLetterStrategy stratagy = getDeadLetterStrategy(); - stratagy.rollback(m.getMessage()); + DeadLetterStrategy strategy = getDeadLetterStrategy(); + strategy.rollback(m.getMessage()); } } finally { messagesLock.writeLock().unlock(); @@ -1560,6 +1560,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index throw new Exception("Retry of message is only possible on Dead Letter Queues!"); } int restoredCounter = 0; + // ensure we deal with a snapshot to avoid potential duplicates in the event of messages + // getting immediate dlq'ed + long numberOfRetryAttemptsToCheckAllMessagesOnce = this.destinationStatistics.getMessages().getCount(); Set set = new LinkedHashSet(); do { doPageIn(true); @@ -1571,6 +1574,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } List list = new ArrayList(set); for (MessageReference ref : list) { + numberOfRetryAttemptsToCheckAllMessagesOnce--; if (ref.getMessage().getOriginalDestination() != null) { moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination()); @@ -1580,7 +1584,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } } - } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages); + } while (numberOfRetryAttemptsToCheckAllMessagesOnce > 0 && set.size() < this.destinationStatistics.getMessages().getCount()); return restoredCounter; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index ecc689445d..0ccf1cb478 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -206,22 +206,26 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { }}); + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME ); QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true); - assertTrue("messagees on dlq", Wait.waitFor(new Wait.Condition() { + assertTrue("messages on dlq", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { + LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + queue.getQueueSize()); return MESSAGE_COUNT == dlq.getQueueSize(); } })); dlq.retryMessages(); - assertTrue("messagees on dlq after retry", Wait.waitFor(new Wait.Condition() { + assertTrue("messages on dlq after retry", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - LOG.info("Dlq size: " + dlq.getQueueSize()); + LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + queue.getQueueSize()); return MESSAGE_COUNT == dlq.getQueueSize(); } }));