diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index f8aacaac72..c698bdd535 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -832,7 +832,8 @@ public class RegionBroker extends EmptyBroker { } } else { if (LOG.isDebugEnabled()) { - LOG.debug("Expired message with no DLQ strategy in place"); + LOG.debug("Dead Letter message with no DLQ strategy in place, message id: " + + message.getMessageId() + ", destination: " + message.getDestination()); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index 422dade1cb..7e98931dc3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -16,7 +16,10 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.command.Message; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * A strategy for choosing which destination is used for dead letter queue @@ -25,13 +28,21 @@ import org.apache.activemq.command.Message; * @version $Revision: 426366 $ */ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { + private static final Log LOG = LogFactory.getLog(AbstractDeadLetterStrategy.class); private boolean processNonPersistent = false; private boolean processExpired = true; + private ActiveMQMessageAudit audit = new ActiveMQMessageAudit(); public boolean isSendToDeadLetterQueue(Message message) { boolean result = false; if (message != null) { result = true; + if (audit.isDuplicate(message)) { + result = false; + if (LOG.isDebugEnabled()) { + LOG.debug("Not adding duplicate to DLQ: " + message.getMessageId() + ", dest: " + message.getDestination()); + } + } if (!message.isPersistent() && !processNonPersistent) { result = false; } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java index 5002521782..88ae816813 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java @@ -224,7 +224,12 @@ public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport { properties.put("maxFileLength", maxFileLengthVal); properties.put("cleanupInterval", "2000"); properties.put("checkpointInterval", "2000"); - properties.put("concurrentStoreAndDispatchQueues", "false"); + // there are problems with duplicate dispatch in the cursor, which maintain + // a map of messages. A dup dispatch can be dropped. + // see: org.apache.activemq.broker.region.cursors.OrderedPendingList + // Adding duplicate detection to the default DLQ strategy removes the problem + // which means we can leave the default for concurrent store and dispatch q + //properties.put("concurrentStoreAndDispatchQueues", "false"); IntrospectionSupport.setProperties(persistenceAdapter, properties); }