mirror of https://github.com/apache/activemq.git
[AMQ-6847] pause dispatch for message move to avoid redelivery with pending ack/remove/audit rollback
This commit is contained in:
parent
005403e94b
commit
2ea5d1420b
|
@ -1471,6 +1471,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
|
public boolean moveMessageTo(ConnectionContext context, QueueMessageReference m, ActiveMQDestination dest) throws Exception {
|
||||||
|
Set<Destination> destsToPause = regionBroker.getDestinations(dest);
|
||||||
|
try {
|
||||||
|
for (Destination d: destsToPause) {
|
||||||
|
if (d instanceof Queue) {
|
||||||
|
((Queue)d).pauseDispatch();
|
||||||
|
}
|
||||||
|
}
|
||||||
BrokerSupport.resend(context, m.getMessage(), dest);
|
BrokerSupport.resend(context, m.getMessage(), dest);
|
||||||
removeMessage(context, m);
|
removeMessage(context, m);
|
||||||
messagesLock.writeLock().lock();
|
messagesLock.writeLock().lock();
|
||||||
|
@ -1483,6 +1490,14 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
} finally {
|
} finally {
|
||||||
messagesLock.writeLock().unlock();
|
messagesLock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
for (Destination d: destsToPause) {
|
||||||
|
if (d instanceof Queue) {
|
||||||
|
((Queue)d).resumeDispatch();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.activemq.ActiveMQPrefetchPolicy;
|
||||||
import org.apache.activemq.ActiveMQSession;
|
import org.apache.activemq.ActiveMQSession;
|
||||||
import org.apache.activemq.BlobMessage;
|
import org.apache.activemq.BlobMessage;
|
||||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||||
|
import org.apache.activemq.RedeliveryPolicy;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.region.BaseDestination;
|
import org.apache.activemq.broker.region.BaseDestination;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
@ -180,6 +181,52 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
|
||||||
assertEquals("no forwards", 0, queueNew.getForwardCount());
|
assertEquals("no forwards", 0, queueNew.getForwardCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMoveFromDLQImmediateDLQ() throws Exception {
|
||||||
|
|
||||||
|
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
|
||||||
|
redeliveryPolicy.setMaximumRedeliveries(0);
|
||||||
|
((ActiveMQConnectionFactory)connectionFactory).setRedeliveryPolicy(redeliveryPolicy);
|
||||||
|
Connection connection = connectionFactory.createConnection();
|
||||||
|
|
||||||
|
// populate
|
||||||
|
useConnection(connection);
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination dest = session.createQueue(getDestinationString());
|
||||||
|
MessageConsumer consumer = session.createConsumer(dest);
|
||||||
|
consumer.setMessageListener(new MessageListener() {
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
try {
|
||||||
|
System.out.println("Received: " + message + " on " + message.getJMSDestination());
|
||||||
|
} catch (JMSException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
throw new RuntimeException("Horrible exception");
|
||||||
|
}});
|
||||||
|
|
||||||
|
|
||||||
|
ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME );
|
||||||
|
QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
|
||||||
|
|
||||||
|
assertTrue("messagees on dlq", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return MESSAGE_COUNT == dlq.getQueueSize();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
dlq.retryMessages();
|
||||||
|
|
||||||
|
assertTrue("messagees on dlq after retry", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
LOG.info("Dlq size: " + dlq.getQueueSize());
|
||||||
|
return MESSAGE_COUNT == dlq.getQueueSize();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
//Show broken behaviour https://issues.apache.org/jira/browse/AMQ-5752"
|
//Show broken behaviour https://issues.apache.org/jira/browse/AMQ-5752"
|
||||||
// points to the need to except on a duplicate or have store.addMessage return boolean
|
// points to the need to except on a duplicate or have store.addMessage return boolean
|
||||||
// need some thought on how best to resolve this
|
// need some thought on how best to resolve this
|
||||||
|
|
Loading…
Reference in New Issue