From 88203aea9bbf4ea1284f2350cd28f1ee21b1d953 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Fri, 30 Dec 2005 14:05:51 +0000 Subject: [PATCH] added the ability to configure the DLQ policy on a per destination basis; either use 1 global DLQ for all messages or use 1 DLQ for a bunch of messages via a wildcard PolicyEntry or use an individual DLQ per destination (which again can be attached to a wildcard via a PolicyEntry) fixes AMQ-459 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@360089 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/region/Destination.java | 2 + .../broker/region/PrefetchSubscription.java | 23 ++-- .../apache/activemq/broker/region/Queue.java | 11 ++ .../apache/activemq/broker/region/Topic.java | 11 ++ .../region/policy/DeadLetterStrategy.java | 33 ++++++ .../broker/region/policy/DispatchPolicy.java | 4 +- .../FixedSizedSubscriptionRecoveryPolicy.java | 6 +- .../policy/IndividualDeadLetterStrategy.java | 109 ++++++++++++++++++ .../broker/region/policy/PolicyEntry.java | 14 +++ .../policy/RoundRobinDispatchPolicy.java | 4 +- .../policy/SharedDeadLetterStrategy.java | 48 ++++++++ .../region/policy/SimpleDispatchPolicy.java | 4 +- .../policy/StrictOrderDispatchPolicy.java | 4 +- .../TimedSubscriptionRecoveryPolicy.java | 12 +- 14 files changed, 258 insertions(+), 27 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java index 0bcb3b22a1..8032099302 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -20,6 +20,7 @@ import java.io.IOException; import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -50,4 +51,5 @@ public interface Destination extends Service { DestinationStatistics getDestinationStatistics(); MessageStore getMessageStore(); + DeadLetterStrategy getDeadLetterStrategy(); } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 1382d05f0e..afed99c2e9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -16,16 +16,9 @@ */ package org.apache.activemq.broker.region; -import java.io.IOException; -import java.util.Iterator; -import java.util.LinkedList; - -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; - import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; @@ -33,6 +26,13 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.transaction.Synchronization; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedList; + /** * A subscription that honors the pre-fetch option of the ConsumerInfo. * @@ -43,7 +43,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription { final protected LinkedList matched = new LinkedList(); final protected LinkedList dispatched = new LinkedList(); - final protected ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ"); protected int delivered=0; int preLoadLimit=1024*100; @@ -176,9 +175,13 @@ abstract public class PrefetchSubscription extends AbstractSubscription { Message message = node.getMessage(); if( message !=null ) { + // TODO is this meant to be == null? if( message.getOriginalDestination()!=null ) message.setOriginalDestination(message.getDestination()); - message.setDestination(dlqDestination); + + ActiveMQDestination originalDestination = message.getOriginalDestination(); + DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy(); + message.setDestination(deadLetterStrategy.getDeadLetterQueueFor(originalDestination)); if( message.getOriginalTransactionId()!=null ) message.setOriginalTransactionId(message.getTransactionId()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index e481e9b6aa..41a87617ce 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -22,6 +22,8 @@ import java.util.Iterator; import java.util.LinkedList; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.command.ActiveMQDestination; @@ -67,6 +69,7 @@ public class Queue implements Destination { private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); protected final MessageStore store; protected int highestSubscriptionPriority; + private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Throwable { @@ -332,6 +335,14 @@ public class Queue implements Destination { this.dispatchPolicy = dispatchPolicy; } + public DeadLetterStrategy getDeadLetterStrategy() { + return deadLetterStrategy; + } + + public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { + this.deadLetterStrategy = deadLetterStrategy; + } + // Implementation methods // ------------------------------------------------------------------------- private MessageReference createMessageReference(Message message) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 0643578572..3e9577fc34 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -20,6 +20,8 @@ import java.io.IOException; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; @@ -60,6 +62,7 @@ public class Topic implements Destination { private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); private boolean sendAdvisoryIfNoConsumers; + private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) { @@ -281,6 +284,14 @@ public class Topic implements Destination { return store; } + public DeadLetterStrategy getDeadLetterStrategy() { + return deadLetterStrategy; + } + + public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { + this.deadLetterStrategy = deadLetterStrategy; + } + // Implementation methods // ------------------------------------------------------------------------- protected void dispatch(ConnectionContext context, Message message) throws Throwable { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java new file mode 100644 index 0000000000..2eb4984369 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java @@ -0,0 +1,33 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.command.ActiveMQDestination; + +/** + * A strategy for choosing which destination is used for dead letter queue messages. + * + * @version $Revision$ + */ +public interface DeadLetterStrategy { + + /** + * Returns the dead letter queue for the given destination. + */ + ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination); + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java index 3f9d3c0693..37578be39a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchPolicy.java @@ -16,12 +16,12 @@ */ package org.apache.activemq.broker.region.policy; +import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; + import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.filter.MessageEvaluationContext; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; - /** * Abstraction to allow different dispatching policies to be plugged * into the region implementations. This is used by a queue to deliver diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java index 12ed0743e7..0591df5d7f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.broker.region.policy; -import java.util.Iterator; -import java.util.List; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -28,6 +25,9 @@ import org.apache.activemq.memory.list.DestinationBasedMessageList; import org.apache.activemq.memory.list.MessageList; import org.apache.activemq.memory.list.SimpleMessageList; +import java.util.Iterator; +import java.util.List; + /** * This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed * amount of memory available in RAM for message history which is evicted in diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java new file mode 100644 index 0000000000..1935cdd204 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java @@ -0,0 +1,109 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; + +/** + * A {@link DeadLetterStrategy} where each destination has its own individual + * DLQ using the subject naming hierarchy. + * + * @org.xbean.XBean + * + * @version $Revision$ + */ +public class IndividualDeadLetterStrategy implements DeadLetterStrategy { + + private String topicPrefix = "ActiveMQ.DLQ.Topic."; + private String queuePrefix = "ActiveMQ.DLQ.Queue."; + private boolean useQueueForQueueMessages = true; + private boolean useQueueForTopicMessages = true; + + public ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination) { + if (originalDestination.isQueue()) { + return createDestination(originalDestination, queuePrefix, useQueueForQueueMessages); + } + else { + return createDestination(originalDestination, topicPrefix, useQueueForTopicMessages); + } + } + + // Properties + // ------------------------------------------------------------------------- + + public String getQueuePrefix() { + return queuePrefix; + } + + /** + * Sets the prefix to use for all dead letter queues for queue messages + */ + public void setQueuePrefix(String queuePrefix) { + this.queuePrefix = queuePrefix; + } + + public String getTopicPrefix() { + return topicPrefix; + } + + /** + * Sets the prefix to use for all dead letter queues for topic messages + */ + public void setTopicPrefix(String topicPrefix) { + this.topicPrefix = topicPrefix; + } + + public boolean isUseQueueForQueueMessages() { + return useQueueForQueueMessages; + } + + /** + * Sets whether a queue or topic should be used for queue messages sent to a + * DLQ. The default is to use a Queue + */ + public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) { + this.useQueueForQueueMessages = useQueueForQueueMessages; + } + + public boolean isUseQueueForTopicMessages() { + return useQueueForTopicMessages; + } + + /** + * Sets whether a queue or topic should be used for topic messages sent to a + * DLQ. The default is to use a Queue + */ + public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) { + this.useQueueForTopicMessages = useQueueForTopicMessages; + } + + // Implementation methods + // ------------------------------------------------------------------------- + protected ActiveMQDestination createDestination(ActiveMQDestination originalDestination, String prefix, boolean useQueue) { + String name = prefix + originalDestination.getPhysicalName(); + if (useQueue) { + return new ActiveMQQueue(name); + } + else { + return new ActiveMQTopic(name); + } + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 3f71a39efb..23ea86aaa7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -35,6 +35,7 @@ public class PolicyEntry extends DestinationMapEntry { private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private RedeliveryPolicy redeliveryPolicy; private boolean sendAdvisoryIfNoConsumers; + private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); public void configure(Queue queue) { if (dispatchPolicy != null) { @@ -89,4 +90,17 @@ public class PolicyEntry extends DestinationMapEntry { public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; } + + public DeadLetterStrategy getDeadLetterStrategy() { + return deadLetterStrategy; + } + + /** + * Sets the policy used to determine which dead letter queue destination should be used + */ + public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { + this.deadLetterStrategy = deadLetterStrategy; + } + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java index c560e6bcf9..d4eacd7537 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java @@ -16,14 +16,14 @@ */ package org.apache.activemq.broker.region.policy; -import java.util.Iterator; +import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.filter.MessageEvaluationContext; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; +import java.util.Iterator; /** * Simple dispatch policy that sends a message to every subscription that diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java new file mode 100644 index 0000000000..26b598d5a7 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java @@ -0,0 +1,48 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; + +/** + * A default implementation of {@link DeadLetterStrategy} which uses + * a constant destination. + * + * + * @org.xbean.XBean + * + * @version $Revision$ + */ +public class SharedDeadLetterStrategy implements DeadLetterStrategy { + + private ActiveMQDestination deadLetterQueue = new ActiveMQQueue("ActiveMQ.DLQ"); + + public ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination) { + return deadLetterQueue; + } + + public ActiveMQDestination getDeadLetterQueue() { + return deadLetterQueue; + } + + public void setDeadLetterQueue(ActiveMQDestination deadLetterQueue) { + this.deadLetterQueue = deadLetterQueue; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java index 595bffbae6..b26e73f04a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java @@ -16,14 +16,14 @@ */ package org.apache.activemq.broker.region.policy; -import java.util.Iterator; +import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.filter.MessageEvaluationContext; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; +import java.util.Iterator; /** * Simple dispatch policy that sends a message to every subscription that diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java index a88a7837a2..14ec450b55 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StrictOrderDispatchPolicy.java @@ -16,14 +16,14 @@ */ package org.apache.activemq.broker.region.policy; -import java.util.Iterator; +import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.filter.MessageEvaluationContext; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; +import java.util.Iterator; /** * Dispatch policy that causes every subscription to see messages in the same order. diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java index c4645423dc..d9130a72fc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java @@ -16,12 +16,6 @@ */ package org.apache.activemq.broker.region.policy; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; @@ -29,6 +23,12 @@ import org.apache.activemq.broker.region.Topic; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.thread.Scheduler; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + /** * This implementation of {@link SubscriptionRecoveryPolicy} will keep a timed * buffer of messages around in memory and use that to recover new