diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index ed376b7ddc..fa6532b33f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -107,4 +107,21 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { public void setExpiration(long expiration) { this.expiration = expiration; } + + public int getMaxProducersToAudit() { + return messageAudit.getMaximumNumberOfProducersToTrack(); + } + + public void setMaxProducersToAudit(int maxProducersToAudit) { + messageAudit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); + } + + public void setMaxAuditDepth(int maxAuditDepth) { + messageAudit.setAuditDepth(maxAuditDepth); + } + + public int getMaxAuditDepth() { + return messageAudit.getAuditDepth(); + } + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterExpiryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterExpiryTest.java index 25a98b9bab..37a603978e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterExpiryTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterExpiryTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.policy; +import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.Message; import javax.jms.Queue; @@ -63,6 +64,23 @@ public class DeadLetterExpiryTest extends DeadLetterTest { pMap.put(new ActiveMQQueue("loop"), buggyLoopingDLQPolicy); pMap.put(new ActiveMQQueue("DLQ.loop"), buggyLoopingDLQPolicy); + SharedDeadLetterStrategy auditConfigured = new SharedDeadLetterStrategy(); + auditConfigured.setDeadLetterQueue(new ActiveMQQueue("DLQ.auditConfigured")); + auditConfigured.setProcessNonPersistent(true); + auditConfigured.setProcessExpired(true); + auditConfigured.setMaxProducersToAudit(1); + auditConfigured.setMaxAuditDepth(10); + PolicyEntry auditConfiguredDlqPolicy = new PolicyEntry(); + auditConfiguredDlqPolicy.setDeadLetterStrategy(auditConfigured); + auditConfiguredDlqPolicy.setExpireMessagesPeriod(1000); + + pMap.put(new ActiveMQQueue("Comp.One"), auditConfiguredDlqPolicy); + pMap.put(new ActiveMQQueue("Comp.Two"), auditConfiguredDlqPolicy); + + PolicyEntry auditConfiguredPolicy = new PolicyEntry(); + auditConfiguredPolicy.setEnableAudit(false); // allow duplicates through the cursor + pMap.put(new ActiveMQQueue("DLQ.auditConfigured"), auditConfiguredPolicy); + PolicyEntry policyWithExpiryProcessing = pMap.getDefaultEntry(); policyWithExpiryProcessing.setExpireMessagesPeriod(1000); pMap.setDefaultEntry(policyWithExpiryProcessing); @@ -122,6 +140,29 @@ public class DeadLetterExpiryTest extends DeadLetterTest { } + public void testAuditConfigured() throws Exception { + destination = new ActiveMQQueue("Comp.One,Comp.Two"); + connection.start(); + + messageCount = 1; + timeToLive = 2000; + deliveryMode = DeliveryMode.NON_PERSISTENT; + sendMessages(); + sendMessages(); + + assertTrue("all messages expired even duplicates!", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + try { + QueueViewMBean queueViewMBean = getProxyToQueue("DLQ.auditConfigured"); + LOG.info("Queue " + queueViewMBean.getName() + ", size:" + queueViewMBean.getQueueSize()); + return queueViewMBean.getQueueSize() == 4; + } catch (Exception expectedTillExpiry) {} + return false; + } + })); + } + public void testNoDLQLoop() throws Exception { destination = new ActiveMQQueue("loop"); messageCount = 2; @@ -170,6 +211,8 @@ public class DeadLetterExpiryTest extends DeadLetterTest { protected void setUp() throws Exception { transactedMode = true; + deliveryMode = DeliveryMode.PERSISTENT; + timeToLive = 0; super.setUp(); }