mirror of https://github.com/apache/activemq.git
Merge pull request #980 from cshannon/amq-9217-rollback
AMQ-9217 - Fix IndividualDeadLetter strategy when retrying messages
This commit is contained in:
commit
ae4e305f85
|
@ -36,7 +36,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
|
||||||
@Override
|
@Override
|
||||||
public void rollback(Message message) {
|
public void rollback(Message message) {
|
||||||
if (message != null && this.enableAudit) {
|
if (message != null && this.enableAudit) {
|
||||||
lookupActiveMQMessageAudit(message).rollback(message);
|
lookupActiveMQMessageAudit(message, true).rollback(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
if (message != null) {
|
if (message != null) {
|
||||||
result = true;
|
result = true;
|
||||||
if (enableAudit && lookupActiveMQMessageAudit(message).isDuplicate(message)) {
|
if (enableAudit && lookupActiveMQMessageAudit(message, false).isDuplicate(message)) {
|
||||||
result = false;
|
result = false;
|
||||||
LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
|
LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
|
||||||
}
|
}
|
||||||
|
@ -115,5 +115,5 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
|
||||||
|
|
||||||
public abstract int getMaxAuditDepth();
|
public abstract int getMaxAuditDepth();
|
||||||
|
|
||||||
protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message);
|
protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback);
|
||||||
}
|
}
|
||||||
|
|
|
@ -202,15 +202,23 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) {
|
protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback) {
|
||||||
ActiveMQMessageAudit messageAudit;
|
ActiveMQMessageAudit messageAudit;
|
||||||
|
|
||||||
synchronized(dedicatedMessageAudits) {
|
synchronized(dedicatedMessageAudits) {
|
||||||
messageAudit = dedicatedMessageAudits.get(message.getDestination().getQualifiedName());
|
// Normally we want to just use the destination property on the message as the key for the map for
|
||||||
|
// caching the messageAudit object for each destination. However, when rolling back, the message
|
||||||
|
// provided here has had its destination changed to the individual DLQ destination and is no longer
|
||||||
|
// the original destination. So to find the correct messageAudit to rollback we need to use
|
||||||
|
// the originalDestination property on the message to get the correct destination that was
|
||||||
|
// used to first cache the messageAudit.
|
||||||
|
final String destinationName = rollback && message.getOriginalDestination() != null ?
|
||||||
|
message.getOriginalDestination().getQualifiedName() : message.getDestination().getQualifiedName();
|
||||||
|
messageAudit = dedicatedMessageAudits.get(destinationName);
|
||||||
|
|
||||||
if(messageAudit == null) {
|
if(messageAudit == null) {
|
||||||
messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), getMaxProducersToAudit());
|
messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), getMaxProducersToAudit());
|
||||||
dedicatedMessageAudits.put(message.getDestination().getQualifiedName(), messageAudit);
|
dedicatedMessageAudits.put(destinationName, messageAudit);
|
||||||
}
|
}
|
||||||
|
|
||||||
return messageAudit;
|
return messageAudit;
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) {
|
protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message, boolean rollback) {
|
||||||
return messageAudit;
|
return messageAudit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue