From 97502bd61edd9a7f817a243931efe914b6fdf28b Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Mon, 17 Jun 2013 22:25:39 +0000 Subject: [PATCH] Fix and updated test for: https://issues.apache.org/jira/browse/AMQ-3405 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1493958 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 9 +++++++-- .../policy/AbstractDeadLetterStrategy.java | 17 ++++++++++++++--- .../region/policy/DeadLetterStrategy.java | 15 ++++++++++++--- .../org/apache/activemq/bugs/AMQ3405Test.java | 3 --- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 2864efa6dd..bddcbcd6c5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -59,6 +59,7 @@ import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupMapFactory; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.broker.util.InsertionCountList; @@ -1451,9 +1452,13 @@ public class Queue extends BaseDestination implements Task, UsageListener { BrokerSupport.resend(context, m.getMessage(), dest); removeMessage(context, m); messagesLock.writeLock().lock(); - try{ + try { messages.rollback(m.getMessageId()); - }finally { + if (isDLQ()) { + DeadLetterStrategy stratagy = getDeadLetterStrategy(); + stratagy.rollback(m.getMessage()); + } + } finally { messagesLock.writeLock().unlock(); } return true; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index b2595a1bee..d35f610e7e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -24,16 +24,23 @@ import org.slf4j.LoggerFactory; /** * A strategy for choosing which destination is used for dead letter queue * messages. - * - * + * */ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class); private boolean processNonPersistent = false; private boolean processExpired = true; private boolean enableAudit = true; - private ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit(); + private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit(); + @Override + public void rollback(Message message) { + if (message != null && this.enableAudit) { + messageAudit.rollback(message); + } + } + + @Override public boolean isSendToDeadLetterQueue(Message message) { boolean result = false; if (message != null) { @@ -57,6 +64,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { /** * @return the processExpired */ + @Override public boolean isProcessExpired() { return this.processExpired; } @@ -64,6 +72,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { /** * @param processExpired the processExpired to set */ + @Override public void setProcessExpired(boolean processExpired) { this.processExpired = processExpired; } @@ -71,6 +80,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { /** * @return the processNonPersistent */ + @Override public boolean isProcessNonPersistent() { return this.processNonPersistent; } @@ -78,6 +88,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { /** * @param processNonPersistent the processNonPersistent to set */ + @Override public void setProcessNonPersistent(boolean processNonPersistent) { this.processNonPersistent = processNonPersistent; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java index 6d8a44684e..5affb7223a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java @@ -22,11 +22,11 @@ import org.apache.activemq.command.Message; /** * A strategy for choosing which destination is used for dead letter queue messages. - * - * + * + * */ public interface DeadLetterStrategy { - + /** * Allow pluggable strategy for deciding if message should be sent to a dead letter queue * for example, you might not want to ignore expired or non-persistent messages @@ -62,4 +62,13 @@ public interface DeadLetterStrategy { public boolean isDLQ(ActiveMQDestination destination); + /** + * Allows for a Message that was already processed by a DLQ to be rolled back in case + * of a move or a retry of that message, otherwise the Message would be considered a + * duplicate if this strategy is doing Message Auditing. + * + * @param message + */ + public void rollback(Message message); + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java index 0b02755106..9711d06361 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java @@ -39,7 +39,6 @@ import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.broker.region.policy.AbstractDeadLetterStrategy; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -79,11 +78,9 @@ public class AMQ3405Test extends TestSupport { BrokerService broker = new BrokerService(); broker.setPersistent(false); PolicyEntry policy = new PolicyEntry(); - policy.setEnableAudit(false); DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy(); if(defaultDeadLetterStrategy!=null) { defaultDeadLetterStrategy.setProcessNonPersistent(true); - ((AbstractDeadLetterStrategy) defaultDeadLetterStrategy).setEnableAudit(false); } PolicyMap pMap = new PolicyMap(); pMap.setDefaultEntry(policy);