added the ability to configure the DLQ policy on a per destination basis; either use 1 global DLQ for all messages or use 1 DLQ for a bunch of messages via a wildcard PolicyEntry or use an individual DLQ per destination (which again can be attached to a wildcard via a PolicyEntry)

fixes AMQ-459

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@360089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2005-12-30 14:05:51 +00:00
parent 20b343110c
commit 88203aea9b
14 changed files with 258 additions and 27 deletions

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
@ -50,4 +51,5 @@ public interface Destination extends Service {
DestinationStatistics getDestinationStatistics(); DestinationStatistics getDestinationStatistics();
MessageStore getMessageStore(); MessageStore getMessageStore();
DeadLetterStrategy getDeadLetterStrategy();
} }

View File

@ -16,16 +16,9 @@
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
@ -33,6 +26,13 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
/** /**
* A subscription that honors the pre-fetch option of the ConsumerInfo. * A subscription that honors the pre-fetch option of the ConsumerInfo.
* *
@ -43,7 +43,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription {
final protected LinkedList matched = new LinkedList(); final protected LinkedList matched = new LinkedList();
final protected LinkedList dispatched = new LinkedList(); final protected LinkedList dispatched = new LinkedList();
final protected ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
protected int delivered=0; protected int delivered=0;
int preLoadLimit=1024*100; int preLoadLimit=1024*100;
@ -176,9 +175,13 @@ abstract public class PrefetchSubscription extends AbstractSubscription {
Message message = node.getMessage(); Message message = node.getMessage();
if( message !=null ) { if( message !=null ) {
// TODO is this meant to be == null?
if( message.getOriginalDestination()!=null ) if( message.getOriginalDestination()!=null )
message.setOriginalDestination(message.getDestination()); message.setOriginalDestination(message.getDestination());
message.setDestination(dlqDestination);
ActiveMQDestination originalDestination = message.getOriginalDestination();
DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy();
message.setDestination(deadLetterStrategy.getDeadLetterQueueFor(originalDestination));
if( message.getOriginalTransactionId()!=null ) if( message.getOriginalTransactionId()!=null )
message.setOriginalTransactionId(message.getTransactionId()); message.setOriginalTransactionId(message.getTransactionId());

View File

@ -22,6 +22,8 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -67,6 +69,7 @@ public class Queue implements Destination {
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
protected final MessageStore store; protected final MessageStore store;
protected int highestSubscriptionPriority; protected int highestSubscriptionPriority;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store,
DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Throwable { DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Throwable {
@ -332,6 +335,14 @@ public class Queue implements Destination {
this.dispatchPolicy = dispatchPolicy; this.dispatchPolicy = dispatchPolicy;
} }
public DeadLetterStrategy getDeadLetterStrategy() {
return deadLetterStrategy;
}
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
private MessageReference createMessageReference(Message message) { private MessageReference createMessageReference(Message message) {

View File

@ -20,6 +20,8 @@ import java.io.IOException;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
@ -60,6 +62,7 @@ public class Topic implements Destination {
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
private boolean sendAdvisoryIfNoConsumers; private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats, public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) { TaskRunnerFactory taskFactory) {
@ -281,6 +284,14 @@ public class Topic implements Destination {
return store; return store;
} }
public DeadLetterStrategy getDeadLetterStrategy() {
return deadLetterStrategy;
}
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected void dispatch(ConnectionContext context, Message message) throws Throwable { protected void dispatch(ConnectionContext context, Message message) throws Throwable {

View File

@ -0,0 +1,33 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.command.ActiveMQDestination;
/**
* A strategy for choosing which destination is used for dead letter queue messages.
*
* @version $Revision$
*/
public interface DeadLetterStrategy {
/**
* Returns the dead letter queue for the given destination.
*/
ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination);
}

View File

@ -16,12 +16,12 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
/** /**
* Abstraction to allow different dispatching policies to be plugged * Abstraction to allow different dispatching policies to be plugged
* into the region implementations. This is used by a queue to deliver * into the region implementations. This is used by a queue to deliver

View File

@ -16,9 +16,6 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import java.util.Iterator;
import java.util.List;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
@ -28,6 +25,9 @@ import org.apache.activemq.memory.list.DestinationBasedMessageList;
import org.apache.activemq.memory.list.MessageList; import org.apache.activemq.memory.list.MessageList;
import org.apache.activemq.memory.list.SimpleMessageList; import org.apache.activemq.memory.list.SimpleMessageList;
import java.util.Iterator;
import java.util.List;
/** /**
* This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed * This implementation of {@link SubscriptionRecoveryPolicy} will keep a fixed
* amount of memory available in RAM for message history which is evicted in * amount of memory available in RAM for message history which is evicted in

View File

@ -0,0 +1,109 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
/**
* A {@link DeadLetterStrategy} where each destination has its own individual
* DLQ using the subject naming hierarchy.
*
* @org.xbean.XBean
*
* @version $Revision$
*/
public class IndividualDeadLetterStrategy implements DeadLetterStrategy {
private String topicPrefix = "ActiveMQ.DLQ.Topic.";
private String queuePrefix = "ActiveMQ.DLQ.Queue.";
private boolean useQueueForQueueMessages = true;
private boolean useQueueForTopicMessages = true;
public ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination) {
if (originalDestination.isQueue()) {
return createDestination(originalDestination, queuePrefix, useQueueForQueueMessages);
}
else {
return createDestination(originalDestination, topicPrefix, useQueueForTopicMessages);
}
}
// Properties
// -------------------------------------------------------------------------
public String getQueuePrefix() {
return queuePrefix;
}
/**
* Sets the prefix to use for all dead letter queues for queue messages
*/
public void setQueuePrefix(String queuePrefix) {
this.queuePrefix = queuePrefix;
}
public String getTopicPrefix() {
return topicPrefix;
}
/**
* Sets the prefix to use for all dead letter queues for topic messages
*/
public void setTopicPrefix(String topicPrefix) {
this.topicPrefix = topicPrefix;
}
public boolean isUseQueueForQueueMessages() {
return useQueueForQueueMessages;
}
/**
* Sets whether a queue or topic should be used for queue messages sent to a
* DLQ. The default is to use a Queue
*/
public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) {
this.useQueueForQueueMessages = useQueueForQueueMessages;
}
public boolean isUseQueueForTopicMessages() {
return useQueueForTopicMessages;
}
/**
* Sets whether a queue or topic should be used for topic messages sent to a
* DLQ. The default is to use a Queue
*/
public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) {
this.useQueueForTopicMessages = useQueueForTopicMessages;
}
// Implementation methods
// -------------------------------------------------------------------------
protected ActiveMQDestination createDestination(ActiveMQDestination originalDestination, String prefix, boolean useQueue) {
String name = prefix + originalDestination.getPhysicalName();
if (useQueue) {
return new ActiveMQQueue(name);
}
else {
return new ActiveMQTopic(name);
}
}
}

View File

@ -35,6 +35,7 @@ public class PolicyEntry extends DestinationMapEntry {
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private RedeliveryPolicy redeliveryPolicy; private RedeliveryPolicy redeliveryPolicy;
private boolean sendAdvisoryIfNoConsumers; private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
public void configure(Queue queue) { public void configure(Queue queue) {
if (dispatchPolicy != null) { if (dispatchPolicy != null) {
@ -89,4 +90,17 @@ public class PolicyEntry extends DestinationMapEntry {
public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
} }
public DeadLetterStrategy getDeadLetterStrategy() {
return deadLetterStrategy;
}
/**
* Sets the policy used to determine which dead letter queue destination should be used
*/
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
} }

View File

@ -16,14 +16,14 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import java.util.Iterator; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import java.util.Iterator;
/** /**
* Simple dispatch policy that sends a message to every subscription that * Simple dispatch policy that sends a message to every subscription that

View File

@ -0,0 +1,48 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
/**
* A default implementation of {@link DeadLetterStrategy} which uses
* a constant destination.
*
*
* @org.xbean.XBean
*
* @version $Revision$
*/
public class SharedDeadLetterStrategy implements DeadLetterStrategy {
private ActiveMQDestination deadLetterQueue = new ActiveMQQueue("ActiveMQ.DLQ");
public ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination) {
return deadLetterQueue;
}
public ActiveMQDestination getDeadLetterQueue() {
return deadLetterQueue;
}
public void setDeadLetterQueue(ActiveMQDestination deadLetterQueue) {
this.deadLetterQueue = deadLetterQueue;
}
}

View File

@ -16,14 +16,14 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import java.util.Iterator; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import java.util.Iterator;
/** /**
* Simple dispatch policy that sends a message to every subscription that * Simple dispatch policy that sends a message to every subscription that

View File

@ -16,14 +16,14 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import java.util.Iterator; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import java.util.Iterator;
/** /**
* Dispatch policy that causes every subscription to see messages in the same order. * Dispatch policy that causes every subscription to see messages in the same order.

View File

@ -16,12 +16,6 @@
*/ */
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
@ -29,6 +23,12 @@ import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.thread.Scheduler; import org.apache.activemq.thread.Scheduler;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/** /**
* This implementation of {@link SubscriptionRecoveryPolicy} will keep a timed * This implementation of {@link SubscriptionRecoveryPolicy} will keep a timed
* buffer of messages around in memory and use that to recover new * buffer of messages around in memory and use that to recover new