https://issues.apache.org/jira/browse/AMQ-3003 - Allow the option of a DLQ per durable subscription DeadLetterStrategy. Additional attribute destinationPerDurableSubscriber, when true causes default dlq individual prefex to be prepended with the durab sub key, connectionid:subscriberName pair. Modified DeadLetterStrategy interface to pass in message and subscription, to provide fine level controll of dlq name. Also modified Broker interface to provide subscription to message expiry and sendToDLQ methods.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1084023 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-03-21 23:41:44 +00:00
parent d618ab37b0
commit 27c1719757
21 changed files with 179 additions and 57 deletions

View File

@ -251,8 +251,8 @@ public class AdvisoryBroker extends BrokerFilter {
}
@Override
public void messageExpired(ConnectionContext context, MessageReference messageReference) {
super.messageExpired(context, messageReference);
public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
super.messageExpired(context, messageReference, subscription);
try {
if(!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
@ -376,8 +376,9 @@ public class AdvisoryBroker extends BrokerFilter {
}
@Override
public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
super.sendToDeadLetterQueue(context, messageReference);
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription){
super.sendToDeadLetterQueue(context, messageReference, subscription);
try {
if(!messageReference.isAdvisory()) {
ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());

View File

@ -305,19 +305,21 @@ public interface Broker extends Region, Service {
/**
* A Message has Expired
*
*
* @param context
* @param messageReference
* @param subscription, may be null
*/
void messageExpired(ConnectionContext context, MessageReference messageReference);
void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription);
/**
* A message needs to go the a DLQ
*
* @param context
* @param messageReference
* @param subscription, may be null
*/
void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference);
void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription);
/**
* @return the broker sequence id

View File

@ -40,7 +40,6 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
@ -254,12 +253,13 @@ public class BrokerFilter implements Broker {
return next.isExpired(messageReference);
}
public void messageExpired(ConnectionContext context, MessageReference message) {
next.messageExpired(context, message);
public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
next.messageExpired(context, message, subscription);
}
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference) {
next.sendToDeadLetterQueue(context, messageReference);
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
next.sendToDeadLetterQueue(context, messageReference, subscription);
}
public Broker getRoot() {

View File

@ -247,10 +247,12 @@ public class EmptyBroker implements Broker {
return false;
}
public void messageExpired(ConnectionContext context, MessageReference message) {
public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
}
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference) {
public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference messageReference,
Subscription subscription) {
}
public Broker getRoot() {

View File

@ -256,11 +256,12 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message);
}
public void messageExpired(ConnectionContext context, MessageReference message) {
public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
throw new BrokerStoppedException(this.message);
}
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference) {
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
throw new BrokerStoppedException(this.message);
}

View File

@ -265,12 +265,13 @@ public class MutableBrokerFilter implements Broker {
return getNext().isExpired(messageReference);
}
public void messageExpired(ConnectionContext context, MessageReference message) {
getNext().messageExpired(context, message);
public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
getNext().messageExpired(context, message, subscription);
}
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference) {
getNext().sendToDeadLetterQueue(context, messageReference);
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
getNext().sendToDeadLetterQueue(context, messageReference, subscription);
}
public Broker getRoot() {

View File

@ -453,7 +453,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
* @throws Exception
*/
protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
broker.getRoot().sendToDeadLetterQueue(context, node);
broker.getRoot().sendToDeadLetterQueue(context, node, this);
}
public int getInFlightSize() {

View File

@ -539,7 +539,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
&& !context.isInRecoveryMode();
if (message.isExpired()) {
// message not stored - or added to stats yet - so chuck here
broker.getRoot().messageExpired(context, message);
broker.getRoot().messageExpired(context, message, null);
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
@ -591,7 +591,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// message may have expired.
if (message.isExpired()) {
LOG.error("expired waiting for space..");
broker.messageExpired(context, message);
broker.messageExpired(context, message, null);
destinationStatistics.getExpired().increment();
} else {
doMessageSend(producerExchangeCopy, message);
@ -644,7 +644,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (LOG.isDebugEnabled()) {
LOG.debug("Expired message: " + message);
}
broker.getRoot().messageExpired(context, message);
broker.getRoot().messageExpired(context, message, null);
return;
}
}
@ -699,7 +699,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// It could take while before we receive the commit
// op, by that time the message could have expired..
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
broker.messageExpired(context, message, null);
destinationStatistics.getExpired().increment();
return;
}
@ -1594,7 +1594,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (LOG.isDebugEnabled()) {
LOG.debug("message expired: " + reference);
}
broker.messageExpired(context, reference);
broker.messageExpired(context, reference, subs);
destinationStatistics.getExpired().increment();
try {
removeMessage(context, subs, (QueueMessageReference) reference);

View File

@ -806,16 +806,16 @@ public class RegionBroker extends EmptyBroker {
@Override
public void messageExpired(ConnectionContext context, MessageReference node) {
public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
if (LOG.isDebugEnabled()) {
LOG.debug("Message expired " + node);
}
getRoot().sendToDeadLetterQueue(context, node);
getRoot().sendToDeadLetterQueue(context, node, null);
}
@Override
public void sendToDeadLetterQueue(ConnectionContext context,
MessageReference node){
MessageReference node, Subscription subscription){
try{
if(node!=null){
Message message=node.getMessage();
@ -838,8 +838,7 @@ public class RegionBroker extends EmptyBroker {
// it is only populated if the message is routed to
// another destination like the DLQ
ActiveMQDestination deadLetterDestination=deadLetterStrategy
.getDeadLetterQueueFor(message
.getDestination());
.getDeadLetterQueueFor(message, subscription);
if (context.getBroker()==null) {
context.setBroker(getRoot());
}

View File

@ -276,7 +276,7 @@ public class Topic extends BaseDestination implements Task {
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if (message.isExpired()) {
broker.messageExpired(context, message);
broker.messageExpired(context, message, null);
getDestinationStatistics().getExpired().increment();
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
@ -322,7 +322,7 @@ public class Topic extends BaseDestination implements Task {
// While waiting for space to free up... the
// message may have expired.
if (message.isExpired()) {
broker.messageExpired(context, message);
broker.messageExpired(context, message, null);
getDestinationStatistics().getExpired().increment();
} else {
doMessageSend(producerExchange, message);
@ -451,7 +451,7 @@ public class Topic extends BaseDestination implements Task {
// expired..
if (broker.isExpired(message)) {
getDestinationStatistics().getExpired().increment();
broker.messageExpired(context, message);
broker.messageExpired(context, message, null);
message.decrementReferenceCount();
return;
}
@ -644,7 +644,7 @@ public class Topic extends BaseDestination implements Task {
}
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
broker.messageExpired(context, reference);
broker.messageExpired(context, reference, subs);
// AMQ-2586: Better to leave this stat at zero than to give the user
// misleading metrics.
// destinationStatistics.getMessages().decrement();

View File

@ -216,7 +216,7 @@ public class TopicSubscription extends AbstractSubscription {
dispatchedCounter.incrementAndGet();
node.decrementReferenceCount();
node.getRegionDestination().getDestinationStatistics().getExpired().increment();
broker.messageExpired(getContext(), node);
broker.messageExpired(getContext(), node, this);
break;
}
}
@ -543,7 +543,7 @@ public class TopicSubscription extends AbstractSubscription {
if (dest != null) {
dest.messageDiscarded(getContext(), this, message);
}
broker.getRoot().sendToDeadLetterQueue(getContext(), message);
broker.getRoot().sendToDeadLetterQueue(getContext(), message, this);
}
@Override

View File

@ -453,7 +453,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
if (LOG.isDebugEnabled()) {
LOG.debug("Discarding message " + message);
}
broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()), message);
broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()),
message, null);
}
protected ByteSequence getByteSequence(Message message) throws IOException {

View File

@ -17,6 +17,9 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@ -35,10 +37,10 @@ public interface DeadLetterStrategy {
boolean isSendToDeadLetterQueue(Message message);
/**
* Returns the dead letter queue for the given destination.
* Returns the dead letter queue for the given message and subscription.
*/
ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination);
ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription);
/**
* @return true if processes expired messages
*/

View File

@ -16,9 +16,12 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
/**
* A {@link DeadLetterStrategy} where each destination has its own individual
@ -33,12 +36,14 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
private String queuePrefix = "ActiveMQ.DLQ.Queue.";
private boolean useQueueForQueueMessages = true;
private boolean useQueueForTopicMessages = true;
private boolean destinationPerDurableSubscriber;
public ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination) {
if (originalDestination.isQueue()) {
return createDestination(originalDestination, queuePrefix, useQueueForQueueMessages);
public ActiveMQDestination getDeadLetterQueueFor(Message message,
Subscription subscription) {
if (message.getDestination().isQueue()) {
return createDestination(message, queuePrefix, useQueueForQueueMessages, subscription);
} else {
return createDestination(originalDestination, topicPrefix, useQueueForTopicMessages);
return createDestination(message, topicPrefix, useQueueForTopicMessages, subscription);
}
}
@ -91,10 +96,30 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
this.useQueueForTopicMessages = useQueueForTopicMessages;
}
public boolean isDestinationPerDurableSubscriber() {
return destinationPerDurableSubscriber;
}
/**
* sets whether durable topic subscriptions are to get individual dead letter destinations.
* When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName'
* The default is false.
* @param destinationPerDurableSubscriber
*/
public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber) {
this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
}
// Implementation methods
// -------------------------------------------------------------------------
protected ActiveMQDestination createDestination(ActiveMQDestination originalDestination, String prefix, boolean useQueue) {
String name = prefix + originalDestination.getPhysicalName();
protected ActiveMQDestination createDestination(Message message,
String prefix,
boolean useQueue,
Subscription subscription ) {
String name = prefix + message.getDestination().getPhysicalName();
if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription) {
name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey();
}
if (useQueue) {
return new ActiveMQQueue(name);
} else {

View File

@ -16,8 +16,10 @@
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
/**
* A default implementation of {@link DeadLetterStrategy} which uses
@ -34,7 +36,7 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
private ActiveMQDestination deadLetterQueue = new ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME);
public ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination) {
public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
return deadLetterQueue;
}

View File

@ -474,7 +474,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
}
@Override
public void messageExpired(ConnectionContext context, MessageReference message) {
public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
if (isLogAll() || isLogInternalEvents()) {
String msg = "Unable to display message.";
@ -482,11 +482,12 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
LOG.info("Message has expired : " + msg);
}
super.messageExpired(context, message);
super.messageExpired(context, message, subscription);
}
@Override
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference) {
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
Subscription subscription) {
if (isLogAll() || isLogInternalEvents()) {
String msg = "Unable to display message.";
@ -494,7 +495,7 @@ public class LoggingBrokerPlugin extends BrokerPluginSupport {
LOG.info("Sending to DLQ : " + msg);
}
super.sendToDeadLetterQueue(context, messageReference);
super.sendToDeadLetterQueue(context, messageReference, subscription);
}
@Override

View File

@ -159,8 +159,8 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
public String toString() {
try {
String text = getText();
if (text != null && text.length() > 63) {
text = text.substring(0, 45) + "..." + text.substring(text.length() - 12);
if (text != null) {
text = MarshallingSupport.truncate64(text);
HashMap<String, Object> overrideFields = new HashMap<String, Object>();
overrideFields.put("text", text);
return super.toString(overrideFields);

View File

@ -21,6 +21,7 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.slf4j.Logger;
@ -44,7 +45,8 @@ public class DiscardingDLQBroker extends BrokerFilter {
}
@Override
public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef) {
public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef,
Subscription subscription) {
if (log.isTraceEnabled()) {
log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null));
}
@ -73,7 +75,7 @@ public class DiscardingDLQBroker extends BrokerFilter {
skipMessage("dropOnly",msgRef);
} else {
dropped = false;
next.sendToDeadLetterQueue(ctx, msgRef);
next.sendToDeadLetterQueue(ctx, msgRef, subscription);
}
if (dropped && getReportInterval()>0) {
if ((++dropCount)%getReportInterval() == 0 ) {

View File

@ -66,12 +66,16 @@ public abstract class DeadLetterTestSupport extends TestSupport {
broker = createBroker();
broker.start();
connection = createConnection();
connection.setClientID(toString());
connection.setClientID(createClientId());
session = connection.createSession(transactedMode, acknowledgeMode);
connection.start();
}
protected String createClientId() {
return toString();
}
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.policy;
import java.util.Enumeration;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* for durable subs, allow a dlq per subscriber such that poison messages are not duplicates
* on the dlq and such that rejecting consumers can be identified
* https://issues.apache.org/jira/browse/AMQ-3003
*/
public class PerDurableConsumerDeadLetterTest extends DeadLetterTest {
private static final Logger LOG = LoggerFactory.getLogger(PerDurableConsumerDeadLetterTest.class);
private static final String CLIENT_ID = "george";
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
PolicyEntry policy = new PolicyEntry();
IndividualDeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
strategy.setProcessNonPersistent(true);
strategy.setDestinationPerDurableSubscriber(true);
policy.setDeadLetterStrategy(strategy);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
return broker;
}
protected String createClientId() {
return CLIENT_ID;
}
protected Destination createDlqDestination() {
String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
String destinationName = prefix + getClass().getName() + "." + getName();
if (durableSubscriber) {
String subName = // connectionId:SubName
CLIENT_ID + ":" + getDestination().toString();
destinationName += "." + subName ;
}
return new ActiveMQQueue(destinationName);
}
}