[AMQ-6702] add maxProducersToAudit and maxAuditDepth to dead letter strategy to cofigure the audit, fix and test

This commit is contained in:
gtully 2017-06-13 14:26:15 +01:00
parent a1e595c18f
commit b6cb0eacea
2 changed files with 60 additions and 0 deletions

View File

@ -107,4 +107,21 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
public void setExpiration(long expiration) { public void setExpiration(long expiration) {
this.expiration = expiration; this.expiration = expiration;
} }
public int getMaxProducersToAudit() {
return messageAudit.getMaximumNumberOfProducersToTrack();
}
public void setMaxProducersToAudit(int maxProducersToAudit) {
messageAudit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
}
public void setMaxAuditDepth(int maxAuditDepth) {
messageAudit.setAuditDepth(maxAuditDepth);
}
public int getMaxAuditDepth() {
return messageAudit.getAuditDepth();
}
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.broker.policy; package org.apache.activemq.broker.policy;
import javax.jms.DeliveryMode;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.Queue; import javax.jms.Queue;
@ -63,6 +64,23 @@ public class DeadLetterExpiryTest extends DeadLetterTest {
pMap.put(new ActiveMQQueue("loop"), buggyLoopingDLQPolicy); pMap.put(new ActiveMQQueue("loop"), buggyLoopingDLQPolicy);
pMap.put(new ActiveMQQueue("DLQ.loop"), buggyLoopingDLQPolicy); pMap.put(new ActiveMQQueue("DLQ.loop"), buggyLoopingDLQPolicy);
SharedDeadLetterStrategy auditConfigured = new SharedDeadLetterStrategy();
auditConfigured.setDeadLetterQueue(new ActiveMQQueue("DLQ.auditConfigured"));
auditConfigured.setProcessNonPersistent(true);
auditConfigured.setProcessExpired(true);
auditConfigured.setMaxProducersToAudit(1);
auditConfigured.setMaxAuditDepth(10);
PolicyEntry auditConfiguredDlqPolicy = new PolicyEntry();
auditConfiguredDlqPolicy.setDeadLetterStrategy(auditConfigured);
auditConfiguredDlqPolicy.setExpireMessagesPeriod(1000);
pMap.put(new ActiveMQQueue("Comp.One"), auditConfiguredDlqPolicy);
pMap.put(new ActiveMQQueue("Comp.Two"), auditConfiguredDlqPolicy);
PolicyEntry auditConfiguredPolicy = new PolicyEntry();
auditConfiguredPolicy.setEnableAudit(false); // allow duplicates through the cursor
pMap.put(new ActiveMQQueue("DLQ.auditConfigured"), auditConfiguredPolicy);
PolicyEntry policyWithExpiryProcessing = pMap.getDefaultEntry(); PolicyEntry policyWithExpiryProcessing = pMap.getDefaultEntry();
policyWithExpiryProcessing.setExpireMessagesPeriod(1000); policyWithExpiryProcessing.setExpireMessagesPeriod(1000);
pMap.setDefaultEntry(policyWithExpiryProcessing); pMap.setDefaultEntry(policyWithExpiryProcessing);
@ -122,6 +140,29 @@ public class DeadLetterExpiryTest extends DeadLetterTest {
} }
public void testAuditConfigured() throws Exception {
destination = new ActiveMQQueue("Comp.One,Comp.Two");
connection.start();
messageCount = 1;
timeToLive = 2000;
deliveryMode = DeliveryMode.NON_PERSISTENT;
sendMessages();
sendMessages();
assertTrue("all messages expired even duplicates!", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
try {
QueueViewMBean queueViewMBean = getProxyToQueue("DLQ.auditConfigured");
LOG.info("Queue " + queueViewMBean.getName() + ", size:" + queueViewMBean.getQueueSize());
return queueViewMBean.getQueueSize() == 4;
} catch (Exception expectedTillExpiry) {}
return false;
}
}));
}
public void testNoDLQLoop() throws Exception { public void testNoDLQLoop() throws Exception {
destination = new ActiveMQQueue("loop"); destination = new ActiveMQQueue("loop");
messageCount = 2; messageCount = 2;
@ -170,6 +211,8 @@ public class DeadLetterExpiryTest extends DeadLetterTest {
protected void setUp() throws Exception { protected void setUp() throws Exception {
transactedMode = true; transactedMode = true;
deliveryMode = DeliveryMode.PERSISTENT;
timeToLive = 0;
super.setUp(); super.setUp();
} }