git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1493958 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-06-17 22:25:39 +00:00
parent 8db5d9599d
commit 97502bd61e
4 changed files with 33 additions and 11 deletions

View File

@ -59,6 +59,7 @@ import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.util.InsertionCountList; import org.apache.activemq.broker.util.InsertionCountList;
@ -1451,9 +1452,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
BrokerSupport.resend(context, m.getMessage(), dest); BrokerSupport.resend(context, m.getMessage(), dest);
removeMessage(context, m); removeMessage(context, m);
messagesLock.writeLock().lock(); messagesLock.writeLock().lock();
try{ try {
messages.rollback(m.getMessageId()); messages.rollback(m.getMessageId());
}finally { if (isDLQ()) {
DeadLetterStrategy stratagy = getDeadLetterStrategy();
stratagy.rollback(m.getMessage());
}
} finally {
messagesLock.writeLock().unlock(); messagesLock.writeLock().unlock();
} }
return true; return true;

View File

@ -24,16 +24,23 @@ import org.slf4j.LoggerFactory;
/** /**
* A strategy for choosing which destination is used for dead letter queue * A strategy for choosing which destination is used for dead letter queue
* messages. * messages.
* *
*
*/ */
public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class);
private boolean processNonPersistent = false; private boolean processNonPersistent = false;
private boolean processExpired = true; private boolean processExpired = true;
private boolean enableAudit = true; private boolean enableAudit = true;
private ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit(); private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit();
@Override
public void rollback(Message message) {
if (message != null && this.enableAudit) {
messageAudit.rollback(message);
}
}
@Override
public boolean isSendToDeadLetterQueue(Message message) { public boolean isSendToDeadLetterQueue(Message message) {
boolean result = false; boolean result = false;
if (message != null) { if (message != null) {
@ -57,6 +64,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
/** /**
* @return the processExpired * @return the processExpired
*/ */
@Override
public boolean isProcessExpired() { public boolean isProcessExpired() {
return this.processExpired; return this.processExpired;
} }
@ -64,6 +72,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
/** /**
* @param processExpired the processExpired to set * @param processExpired the processExpired to set
*/ */
@Override
public void setProcessExpired(boolean processExpired) { public void setProcessExpired(boolean processExpired) {
this.processExpired = processExpired; this.processExpired = processExpired;
} }
@ -71,6 +80,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
/** /**
* @return the processNonPersistent * @return the processNonPersistent
*/ */
@Override
public boolean isProcessNonPersistent() { public boolean isProcessNonPersistent() {
return this.processNonPersistent; return this.processNonPersistent;
} }
@ -78,6 +88,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
/** /**
* @param processNonPersistent the processNonPersistent to set * @param processNonPersistent the processNonPersistent to set
*/ */
@Override
public void setProcessNonPersistent(boolean processNonPersistent) { public void setProcessNonPersistent(boolean processNonPersistent) {
this.processNonPersistent = processNonPersistent; this.processNonPersistent = processNonPersistent;
} }

View File

@ -22,11 +22,11 @@ import org.apache.activemq.command.Message;
/** /**
* A strategy for choosing which destination is used for dead letter queue messages. * A strategy for choosing which destination is used for dead letter queue messages.
* *
* *
*/ */
public interface DeadLetterStrategy { public interface DeadLetterStrategy {
/** /**
* Allow pluggable strategy for deciding if message should be sent to a dead letter queue * Allow pluggable strategy for deciding if message should be sent to a dead letter queue
* for example, you might not want to ignore expired or non-persistent messages * for example, you might not want to ignore expired or non-persistent messages
@ -62,4 +62,13 @@ public interface DeadLetterStrategy {
public boolean isDLQ(ActiveMQDestination destination); public boolean isDLQ(ActiveMQDestination destination);
/**
* Allows for a Message that was already processed by a DLQ to be rolled back in case
* of a move or a retry of that message, otherwise the Message would be considered a
* duplicate if this strategy is doing Message Auditing.
*
* @param message
*/
public void rollback(Message message);
} }

View File

@ -39,7 +39,6 @@ import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.TestSupport; import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.AbstractDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
@ -79,11 +78,9 @@ public class AMQ3405Test extends TestSupport {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setPersistent(false); broker.setPersistent(false);
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setEnableAudit(false);
DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy(); DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy();
if(defaultDeadLetterStrategy!=null) { if(defaultDeadLetterStrategy!=null) {
defaultDeadLetterStrategy.setProcessNonPersistent(true); defaultDeadLetterStrategy.setProcessNonPersistent(true);
((AbstractDeadLetterStrategy) defaultDeadLetterStrategy).setEnableAudit(false);
} }
PolicyMap pMap = new PolicyMap(); PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy); pMap.setDefaultEntry(policy);