diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index fba78cb5a7..ac206e30ad 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1462,18 +1462,33 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index * @throws Exception */ public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception { - BrokerSupport.resend(context, m.getMessage(), dest); - removeMessage(context, m); - messagesLock.writeLock().lock(); + Set destsToPause = regionBroker.getDestinations(dest); try { - messages.rollback(m.getMessageId()); - if (isDLQ()) { - DeadLetterStrategy stratagy = getDeadLetterStrategy(); - stratagy.rollback(m.getMessage()); + for (Destination d: destsToPause) { + if (d instanceof Queue) { + ((Queue)d).pauseDispatch(); + } + } + BrokerSupport.resend(context, m.getMessage(), dest); + removeMessage(context, m); + messagesLock.writeLock().lock(); + try { + messages.rollback(m.getMessageId()); + if (isDLQ()) { + DeadLetterStrategy stratagy = getDeadLetterStrategy(); + stratagy.rollback(m.getMessage()); + } + } finally { + messagesLock.writeLock().unlock(); } } finally { - messagesLock.writeLock().unlock(); + for (Destination d: destsToPause) { + if (d instanceof Queue) { + ((Queue)d).resumeDispatch(); + } + } } + return true; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index d72d709286..ecc689445d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -54,6 +54,7 @@ import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -180,6 +181,52 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { assertEquals("no forwards", 0, queueNew.getForwardCount()); } + public void testMoveFromDLQImmediateDLQ() throws Exception { + + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + redeliveryPolicy.setMaximumRedeliveries(0); + ((ActiveMQConnectionFactory)connectionFactory).setRedeliveryPolicy(redeliveryPolicy); + Connection connection = connectionFactory.createConnection(); + + // populate + useConnection(connection); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = session.createQueue(getDestinationString()); + MessageConsumer consumer = session.createConsumer(dest); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + System.out.println("Received: " + message + " on " + message.getJMSDestination()); + } catch (JMSException e) { + e.printStackTrace(); + } + throw new RuntimeException("Horrible exception"); + }}); + + + ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME ); + QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true); + + assertTrue("messagees on dlq", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return MESSAGE_COUNT == dlq.getQueueSize(); + } + })); + + dlq.retryMessages(); + + assertTrue("messagees on dlq after retry", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("Dlq size: " + dlq.getQueueSize()); + return MESSAGE_COUNT == dlq.getQueueSize(); + } + })); + } + //Show broken behaviour https://issues.apache.org/jira/browse/AMQ-5752" // points to the need to except on a duplicate or have store.addMessage return boolean // need some thought on how best to resolve this