diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index fa6532b33f..82e07560ec 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -31,13 +31,12 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { private boolean processNonPersistent = false; private boolean processExpired = true; private boolean enableAudit = true; - private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit(); private long expiration; @Override public void rollback(Message message) { if (message != null && this.enableAudit) { - messageAudit.rollback(message); + lookupActiveMQMessageAudit(message).rollback(message); } } @@ -46,7 +45,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { boolean result = false; if (message != null) { result = true; - if (enableAudit && messageAudit.isDuplicate(message)) { + if (enableAudit && lookupActiveMQMessageAudit(message).isDuplicate(message)) { result = false; LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination()); } @@ -108,20 +107,13 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { this.expiration = expiration; } - public int getMaxProducersToAudit() { - return messageAudit.getMaximumNumberOfProducersToTrack(); - } + public abstract int getMaxProducersToAudit(); - public void setMaxProducersToAudit(int maxProducersToAudit) { - messageAudit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); - } + public abstract void setMaxProducersToAudit(int maxProducersToAudit); - public void setMaxAuditDepth(int maxAuditDepth) { - messageAudit.setAuditDepth(maxAuditDepth); - } + public abstract void setMaxAuditDepth(int maxAuditDepth); - public int getMaxAuditDepth() { - return messageAudit.getAuditDepth(); - } + public abstract int getMaxAuditDepth(); + protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java index 1dfaa15667..3dd41ae0bb 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Subscription; @@ -23,6 +24,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; +import org.apache.activemq.util.LRUCache; /** * A {@link DeadLetterStrategy} where each destination has its own individual @@ -40,6 +42,10 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { private boolean useQueueForQueueMessages = true; private boolean useQueueForTopicMessages = true; private boolean destinationPerDurableSubscriber; + private int maxAuditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; + private int maxProducersToAudit = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; + + private final LRUCache dedicatedMessageAudits = new LRUCache<>(10_000); public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) { if (message.getDestination().isQueue()) { @@ -51,6 +57,13 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { // Properties // ------------------------------------------------------------------------- + public int getMaxDestinationsToAudit() { + return dedicatedMessageAudits.getMaxCacheSize(); + } + + public void maxDestinationsToAudit(int maxDestinationsToAudit) { + this.dedicatedMessageAudits.setMaxCacheSize(maxDestinationsToAudit); + } public String getQueuePrefix() { return queuePrefix; @@ -134,6 +147,26 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { this.destinationPerDurableSubscriber = destinationPerDurableSubscriber; } + @Override + public int getMaxProducersToAudit() { + return this.maxProducersToAudit; + } + + @Override + public void setMaxProducersToAudit(int maxProducersToAudit) { + this.maxProducersToAudit = maxProducersToAudit; + } + + @Override + public void setMaxAuditDepth(int maxAuditDepth) { + this.maxAuditDepth = maxAuditDepth; + } + + @Override + public int getMaxAuditDepth() { + return this.maxAuditDepth; + } + // Implementation methods // ------------------------------------------------------------------------- protected ActiveMQDestination createDestination(Message message, @@ -168,4 +201,19 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy { } } + @Override + protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) { + ActiveMQMessageAudit messageAudit; + + synchronized(dedicatedMessageAudits) { + messageAudit = dedicatedMessageAudits.get(message.getDestination().getQualifiedName()); + + if(messageAudit == null) { + messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), getMaxProducersToAudit()); + dedicatedMessageAudits.put(message.getDestination().getQualifiedName(), messageAudit); + } + + return messageAudit; + } + } } \ No newline at end of file diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java index 41f1f10280..8a78e83cf3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; @@ -35,6 +36,7 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy { public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "ActiveMQ.DLQ"; private ActiveMQDestination deadLetterQueue = new ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME); + private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit(); public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) { return deadLetterQueue; @@ -48,4 +50,29 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy { this.deadLetterQueue = deadLetterQueue; } + @Override + public int getMaxProducersToAudit() { + return messageAudit.getMaximumNumberOfProducersToTrack(); + } + + @Override + public void setMaxProducersToAudit(int maxProducersToAudit) { + messageAudit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); + } + + @Override + public void setMaxAuditDepth(int maxAuditDepth) { + messageAudit.setAuditDepth(maxAuditDepth); + } + + @Override + public int getMaxAuditDepth() { + return messageAudit.getAuditDepth(); + } + + @Override + protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) { + return messageAudit; + } + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java index 5dc4ae7a97..2b51ffa955 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java @@ -16,22 +16,31 @@ */ package org.apache.activemq.broker.policy; +import java.util.Arrays; import java.util.Enumeration; +import java.util.Set; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Queue; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.broker.region.virtual.CompositeQueue; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,12 +57,37 @@ public class IndividualDeadLetterTest extends DeadLetterTest { strategy.setProcessNonPersistent(true); policy.setDeadLetterStrategy(strategy); + PolicyEntry indvAuditPolicy = new PolicyEntry(); + IndividualDeadLetterStrategy indvAuditDlqStrategy = new IndividualDeadLetterStrategy(); + indvAuditDlqStrategy.setEnableAudit(true); + indvAuditPolicy.setDeadLetterStrategy(indvAuditDlqStrategy); + + PolicyEntry shrAuditPolicy = new PolicyEntry(); + SharedDeadLetterStrategy shrAuditDlqStrategy = new SharedDeadLetterStrategy(); + shrAuditDlqStrategy.setEnableAudit(true); + shrAuditPolicy.setDeadLetterStrategy(shrAuditDlqStrategy); + PolicyMap pMap = new PolicyMap(); pMap.put(new ActiveMQQueue(getDestinationString()), policy); pMap.put(new ActiveMQTopic(getDestinationString()), policy); + pMap.put(new ActiveMQQueue(getDestinationString() + ".INDV.>"), indvAuditPolicy); + pMap.put(new ActiveMQQueue(getDestinationString() + ".SHR.>"), shrAuditPolicy); broker.setDestinationPolicy(pMap); + CompositeQueue indvAuditCompQueue = new CompositeQueue(); + indvAuditCompQueue.setName(getDestinationString() + ".INDV.A"); + indvAuditCompQueue.setForwardOnly(true); + indvAuditCompQueue.setForwardTo(Arrays.asList(new ActiveMQQueue(getDestinationString() + ".INDV.B"), new ActiveMQQueue(getDestinationString() + ".INDV.C"))); + + CompositeQueue sharedAuditCompQueue = new CompositeQueue(); + sharedAuditCompQueue.setName(getDestinationString() + ".SHR.A"); + sharedAuditCompQueue.setForwardOnly(true); + sharedAuditCompQueue.setForwardTo(Arrays.asList(new ActiveMQQueue(getDestinationString() + ".SHR.B"), new ActiveMQQueue(getDestinationString() + ".SHR.C"))); + + VirtualDestinationInterceptor vdi = new VirtualDestinationInterceptor(); + vdi.setVirtualDestinations(new VirtualDestination[] { indvAuditCompQueue, sharedAuditCompQueue }); + broker.setDestinationInterceptors(new VirtualDestinationInterceptor[] {vdi}); return broker; } @@ -99,6 +133,99 @@ public class IndividualDeadLetterTest extends DeadLetterTest { assertNull("The message shouldn't be sent to another DLQ", testConsumer.receive(1000)); } + // AMQ-9217 + public void testPerDestinationAuditDefault() throws Exception { + ActiveMQConnection amqConnection = (ActiveMQConnection) connection; + rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; + + connection.start(); + session = connection.createSession(transactedMode, acknowledgeMode); + MessageProducer messageProducerA = session.createProducer(session.createQueue(getDestinationString() + ".INDV.A")); + messageProducerA.send(session.createTextMessage("testPerDestinationAuditEnabled")); + session.commit(); + + for(String destName : Set.of(getDestinationString() + ".INDV.B", getDestinationString() + ".INDV.C")) { + for (int i = 0; i < rollbackCount; i++) { + MessageConsumer indvConsumer = session.createConsumer(session.createQueue(destName)); + Message message = indvConsumer.receive(5000); + assertNotNull("No message received: ", message); + + session.rollback(); + LOG.info("Rolled back: " + rollbackCount + " times"); + indvConsumer.close(); + } + } + + QueueViewMBean a = getProxyToQueue(getDestinationString() + ".INDV.A"); + assertNotNull(a); + assertTrue(Wait.waitFor(() -> a.getEnqueueCount() == 0l, 3000, 250)); + assertTrue(Wait.waitFor(() -> a.getQueueSize() == 0l, 3000, 250)); + + QueueViewMBean b = getProxyToQueue(getDestinationString() + ".INDV.B"); + assertNotNull(b); + assertTrue(Wait.waitFor(() -> b.getEnqueueCount() == 1l, 3000, 250)); + assertTrue(Wait.waitFor(() -> b.getQueueSize() == 0l, 3000, 250)); + + QueueViewMBean c = getProxyToQueue(getDestinationString() + ".INDV.C"); + assertNotNull(c); + assertTrue(Wait.waitFor(() -> c.getEnqueueCount() == 1l, 3000, 250)); + assertTrue(Wait.waitFor(() -> c.getQueueSize() == 0l, 3000, 250)); + + QueueViewMBean bDlq = getProxyToQueue("ActiveMQ.DLQ.Queue." + getDestinationString() + ".INDV.B"); + assertNotNull(bDlq); + assertTrue(Wait.waitFor(() -> bDlq.getEnqueueCount() == 1l, 3000, 250)); + assertTrue(Wait.waitFor(() -> bDlq.getQueueSize() == 1l, 3000, 250)); + + QueueViewMBean cDlq = getProxyToQueue("ActiveMQ.DLQ.Queue." + getDestinationString() + ".INDV.C"); + assertNotNull(cDlq); + assertTrue(Wait.waitFor(() -> cDlq.getEnqueueCount() == 1, 3000, 250)); + assertTrue(Wait.waitFor(() -> cDlq.getQueueSize() == 1, 3000, 250)); + } + + public void testSharedDestinationAuditDropsMessages() throws Exception { + ActiveMQConnection amqConnection = (ActiveMQConnection) connection; + rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; + + connection.start(); + session = connection.createSession(transactedMode, acknowledgeMode); + MessageProducer messageProducerA = session.createProducer(session.createQueue(getDestinationString() + ".SHR.A")); + messageProducerA.send(session.createTextMessage("testSharedDestinationAuditDropsMessages")); + session.commit(); + + for(String destName : Set.of(getDestinationString() + ".SHR.B", getDestinationString() + ".SHR.C")) { + for (int i = 0; i < rollbackCount; i++) { + MessageConsumer shrConsumer = session.createConsumer(session.createQueue(destName)); + Message message = shrConsumer.receive(5000); + assertNotNull("No message received: ", message); + + session.rollback(); + LOG.info("Rolled back: " + rollbackCount + " times"); + shrConsumer.close(); + } + } + + QueueViewMBean a = getProxyToQueue(getDestinationString() + ".SHR.A"); + assertNotNull(a); + assertTrue(Wait.waitFor(() -> a.getEnqueueCount() == 0l, 3000, 250)); + assertTrue(Wait.waitFor(() -> a.getQueueSize() == 0l, 3000, 250)); + + QueueViewMBean b = getProxyToQueue(getDestinationString() + ".SHR.B"); + assertNotNull(b); + assertTrue(Wait.waitFor(() -> b.getEnqueueCount() == 1l, 3000, 250)); + assertTrue(Wait.waitFor(() -> b.getQueueSize() == 0l, 3000, 250)); + + QueueViewMBean c = getProxyToQueue(getDestinationString() + ".SHR.C"); + assertNotNull(c); + assertTrue(Wait.waitFor(() -> c.getEnqueueCount() == 1l, 3000, 250)); + assertTrue(Wait.waitFor(() -> c.getQueueSize() == 0l, 3000, 250)); + + // Only 1 message in 1 DLQ means the a message was dropped due to shared message audit + QueueViewMBean sharedDlq = getProxyToQueue("ActiveMQ.DLQ"); + assertNotNull(sharedDlq); + assertTrue(Wait.waitFor(() -> sharedDlq.getEnqueueCount() == 1, 3000, 250)); + assertTrue(Wait.waitFor(() -> sharedDlq.getQueueSize() == 1, 3000, 250)); + } + protected void browseDlq() throws Exception { Enumeration messages = dlqBrowser.getEnumeration(); while (messages.hasMoreElements()) {