[AMQ-6847] pause dispatch for message move to avoid redelivery with pending ack/remove/audit rollback

(cherry picked from commit 2ea5d1420b)
This commit is contained in:
gtully 2017-10-27 11:40:06 +01:00 committed by Christopher L. Shannon (cshannon)
parent b04b971685
commit eb9e50f3c9
2 changed files with 70 additions and 8 deletions

View File

@ -1462,18 +1462,33 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
* @throws Exception
*/
public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
BrokerSupport.resend(context, m.getMessage(), dest);
removeMessage(context, m);
messagesLock.writeLock().lock();
Set<Destination> destsToPause = regionBroker.getDestinations(dest);
try {
messages.rollback(m.getMessageId());
if (isDLQ()) {
DeadLetterStrategy stratagy = getDeadLetterStrategy();
stratagy.rollback(m.getMessage());
for (Destination d: destsToPause) {
if (d instanceof Queue) {
((Queue)d).pauseDispatch();
}
}
BrokerSupport.resend(context, m.getMessage(), dest);
removeMessage(context, m);
messagesLock.writeLock().lock();
try {
messages.rollback(m.getMessageId());
if (isDLQ()) {
DeadLetterStrategy stratagy = getDeadLetterStrategy();
stratagy.rollback(m.getMessage());
}
} finally {
messagesLock.writeLock().unlock();
}
} finally {
messagesLock.writeLock().unlock();
for (Destination d: destsToPause) {
if (d instanceof Queue) {
((Queue)d).resumeDispatch();
}
}
}
return true;
}

View File

@ -54,6 +54,7 @@ import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -180,6 +181,52 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("no forwards", 0, queueNew.getForwardCount());
}
public void testMoveFromDLQImmediateDLQ() throws Exception {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(0);
((ActiveMQConnectionFactory)connectionFactory).setRedeliveryPolicy(redeliveryPolicy);
Connection connection = connectionFactory.createConnection();
// populate
useConnection(connection);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createQueue(getDestinationString());
MessageConsumer consumer = session.createConsumer(dest);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("Received: " + message + " on " + message.getJMSDestination());
} catch (JMSException e) {
e.printStackTrace();
}
throw new RuntimeException("Horrible exception");
}});
ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME );
QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
assertTrue("messagees on dlq", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return MESSAGE_COUNT == dlq.getQueueSize();
}
}));
dlq.retryMessages();
assertTrue("messagees on dlq after retry", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Dlq size: " + dlq.getQueueSize());
return MESSAGE_COUNT == dlq.getQueueSize();
}
}));
}
//Show broken behaviour https://issues.apache.org/jira/browse/AMQ-5752"
// points to the need to except on a duplicate or have store.addMessage return boolean
// need some thought on how best to resolve this