AMQ-9217 - Fix IndividualDeadLetter strategy rollback

This fixes the rollback after the latest changes by using the
originalDestination property to look up the correct message audit on
rollback
This commit is contained in:
Christopher L. Shannon (cshannon) 2023-02-16 07:21:20 -05:00
parent ee14778305
commit 459388185a
3 changed files with 15 additions and 7 deletions

View File

@ -36,7 +36,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
@Override @Override
public void rollback(Message message) { public void rollback(Message message) {
if (message != null && this.enableAudit) { if (message != null && this.enableAudit) {
lookupActiveMQMessageAudit(message).rollback(message); lookupActiveMQMessageAudit(message, true).rollback(message);
} }
} }
@ -45,7 +45,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
boolean result = false; boolean result = false;
if (message != null) { if (message != null) {
result = true; result = true;
if (enableAudit && lookupActiveMQMessageAudit(message).isDuplicate(message)) { if (enableAudit && lookupActiveMQMessageAudit(message, false).isDuplicate(message)) {
result = false; result = false;
LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination()); LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
} }
@ -115,5 +115,5 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
public abstract int getMaxAuditDepth(); public abstract int getMaxAuditDepth();
protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message); protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback);
} }

View File

@ -202,15 +202,23 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
} }
@Override @Override
protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) { protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback) {
ActiveMQMessageAudit messageAudit; ActiveMQMessageAudit messageAudit;
synchronized(dedicatedMessageAudits) { synchronized(dedicatedMessageAudits) {
messageAudit = dedicatedMessageAudits.get(message.getDestination().getQualifiedName()); // Normally we want to just use the destination property on the message as the key for the map for
// caching the messageAudit object for each destination. However, when rolling back, the message
// provided here has had its destination changed to the individual DLQ destination and is no longer
// the original destination. So to find the correct messageAudit to rollback we need to use
// the originalDestination property on the message to get the correct destination that was
// used to first cache the messageAudit.
final String destinationName = rollback && message.getOriginalDestination() != null ?
message.getOriginalDestination().getQualifiedName() : message.getDestination().getQualifiedName();
messageAudit = dedicatedMessageAudits.get(destinationName);
if(messageAudit == null) { if(messageAudit == null) {
messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), getMaxProducersToAudit()); messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), getMaxProducersToAudit());
dedicatedMessageAudits.put(message.getDestination().getQualifiedName(), messageAudit); dedicatedMessageAudits.put(destinationName, messageAudit);
} }
return messageAudit; return messageAudit;

View File

@ -71,7 +71,7 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
} }
@Override @Override
protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) { protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback) {
return messageAudit; return messageAudit;
} }