AMQ-7464 - ensure message.copy before server session run dispatch

This commit is contained in:
gtully 2020-04-08 14:57:49 +01:00
parent da9e4028c9
commit aa03f295f5
2 changed files with 117 additions and 4 deletions

View File

@ -73,6 +73,7 @@ import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@ -882,7 +883,16 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
MessageDispatch messageDispatch;
while ((messageDispatch = executor.dequeueNoWait()) != null) {
final MessageDispatch md = messageDispatch;
final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
// subset of org.apache.activemq.ActiveMQMessageConsumer.createActiveMQMessage
final ActiveMQMessage message = (ActiveMQMessage)md.getMessage().copy();
if (message.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
((ActiveMQBlobMessage)message).setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
}
if (message.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
((ActiveMQObjectMessage)message).setTrustAllPackages(getConnection().isTrustAllPackages());
((ActiveMQObjectMessage)message).setTrustedPackages(getConnection().getTrustedPackages());
}
MessageAck earlyAck = null;
if (message.isExpired()) {
@ -951,7 +961,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
@Override
public void afterRollback() throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("rollback {}", ack, new Throwable("here"));
LOG.trace("afterRollback {}", ack, new Throwable("here"));
}
// ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
@ -979,6 +989,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack);
asyncSendPacket(ack);
} else {

View File

@ -149,7 +149,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
/**
* @throws Exception
*/
public void testNornalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
public void testNormalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
// Receive a message with the JMS API
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
@ -742,7 +742,7 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
public void onMessage(Message message) {
try {
ActiveMQTextMessage m = (ActiveMQTextMessage) message;
LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId());
LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId() + ", redeliveryCount: " + m.getRedeliveryCounter());
assertEquals("1st", m.getText());
assertEquals(receivedCount.get(), m.getRedeliveryCounter());
receivedCount.incrementAndGet();
@ -802,6 +802,108 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
}
public void testRepeatedRedeliveryNoCommitForwardMessage() throws Exception {
connection.start();
Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = dlqSession.createProducer(destination);
// Send the messages
producer.send(dlqSession.createTextMessage("1st"));
dlqSession.commit();
MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
final MessageProducer forwardingProducer = dlqSession.createProducer(new ActiveMQQueue("TEST_FORWARD"));
// Send the messages
final int maxRedeliveries = 2;
final AtomicInteger receivedCount = new AtomicInteger(0);
for (int i=0;i<=maxRedeliveries+1;i++) {
connection = (ActiveMQConnection)factory.createConnection(userName, password);
connections.add(connection);
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(0);
policy.setUseExponentialBackOff(false);
policy.setMaximumRedeliveries(maxRedeliveries);
connection.start();
final CountDownLatch done = new CountDownLatch(1);
final ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED);
session.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
ActiveMQTextMessage m = (ActiveMQTextMessage) message;
LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId() + " ,Redelivery: " + m.getRedeliveryCounter());
assertEquals("1st", m.getText());
assertEquals(receivedCount.get(), m.getRedeliveryCounter());
receivedCount.incrementAndGet();
// do a forward of the received message, will reset the messageID
forwardingProducer.send(message);
done.countDown();
} catch (Exception ignored) {
ignored.printStackTrace();
}
}
});
connection.createConnectionConsumer(
destination,
null,
new ServerSessionPool() {
@Override
public ServerSession getServerSession() throws JMSException {
return new ServerSession() {
@Override
public Session getSession() throws JMSException {
return session;
}
@Override
public void start() throws JMSException {
}
};
}
},
100,
false);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
session.run();
return done.await(10, TimeUnit.MILLISECONDS);
}
}, 5000);
if (i<=maxRedeliveries) {
assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS));
} else {
// final redelivery gets poisoned before dispatch
assertFalse("listener not done @" + i, done.await(5, TimeUnit.SECONDS));
}
connection.close();
connections.remove(connection);
}
// We should be able to get the message off the DLQ now.
TextMessage m = (TextMessage)dlqConsumer.receive(1000);
assertNotNull("Got message from DLQ", m);
assertEquals("1st", m.getText());
String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy"));
dlqSession.commit();
}
public void testRedeliveryRollbackWithDelayBlocking() throws Exception
{
redeliveryRollbackWithDelay(true);