diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index 82e07560ec..2421206810 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -36,7 +36,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { @Override public void rollback(Message message) { if (message != null && this.enableAudit) { - lookupActiveMQMessageAudit(message).rollback(message); + lookupActiveMQMessageAudit(message, true).rollback(message); } } @@ -45,7 +45,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { boolean result = false; if (message != null) { result = true; - if (enableAudit && lookupActiveMQMessageAudit(message).isDuplicate(message)) { + if (enableAudit && lookupActiveMQMessageAudit(message, false).isDuplicate(message)) { result = false; LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination()); } @@ -115,5 +115,5 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { public abstract int getMaxAuditDepth(); - protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message); + protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java index 3dd41ae0bb..f626365fa9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java @@ -202,15 +202,23 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { } @Override - protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) { + protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback) { ActiveMQMessageAudit messageAudit; synchronized(dedicatedMessageAudits) { - messageAudit = dedicatedMessageAudits.get(message.getDestination().getQualifiedName()); + // Normally we want to just use the destination property on the message as the key for the map for + // caching the messageAudit object for each destination. However, when rolling back, the message + // provided here has had its destination changed to the individual DLQ destination and is no longer + // the original destination. So to find the correct messageAudit to rollback we need to use + // the originalDestination property on the message to get the correct destination that was + // used to first cache the messageAudit. + final String destinationName = rollback && message.getOriginalDestination() != null ? + message.getOriginalDestination().getQualifiedName() : message.getDestination().getQualifiedName(); + messageAudit = dedicatedMessageAudits.get(destinationName); if(messageAudit == null) { messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), getMaxProducersToAudit()); - dedicatedMessageAudits.put(message.getDestination().getQualifiedName(), messageAudit); + dedicatedMessageAudits.put(destinationName, messageAudit); } return messageAudit; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java index 8a78e83cf3..a3dfe0e74e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java @@ -71,7 +71,7 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy { } @Override - protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) { + protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback) { return messageAudit; }