From 69e3b6c8c83d2bf0a06a0a7f0e3a8bd44cde907b Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 21 Jun 2010 07:57:32 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-2786 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@956481 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/advisory/AdvisoryBroker.java | 15 +++++++++++++++ .../apache/activemq/advisory/AdvisorySupport.java | 11 +++++++++-- .../broker/region/PrefetchSubscription.java | 2 +- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 2b5ad65d8a..8dd29025b8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -386,6 +386,21 @@ public class AdvisoryBroker extends BrokerFilter { LOG.warn("Failed to fire message master broker advisory"); } } + + @Override + public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){ + super.sendToDeadLetterQueue(context, messageReference); + try { + if(!messageReference.isAdvisory()) { + ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination()); + Message payload = messageReference.getMessage().copy(); + payload.clearBody(); + fireAdvisory(context, topic,payload); + } + } catch (Exception e) { + LOG.warn("Failed to fire message consumed advisory"); + } + } protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { fireAdvisory(context, topic, command, null); diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java index 4ad39b0014..822fd1fe92 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java @@ -16,11 +16,11 @@ */ package org.apache.activemq.advisory; +import javax.jms.Destination; +import javax.jms.JMSException; import org.apache.activemq.ActiveMQMessageTransformation; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; -import javax.jms.Destination; -import javax.jms.JMSException; public final class AdvisorySupport { public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory."; public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX @@ -45,6 +45,7 @@ public final class AdvisorySupport { public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL."; public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered."; public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed."; + public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd."; public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker"; public static final String AGENT_TOPIC = "ActiveMQ.Agent"; public static final String ADIVSORY_MESSAGE_TYPE = "Advisory"; @@ -185,6 +186,12 @@ public final class AdvisorySupport { + destination.getPhysicalName(); return new ActiveMQTopic(name); } + + public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) { + String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." + + destination.getPhysicalName(); + return new ActiveMQTopic(name); + } public static ActiveMQTopic getMasterBrokerAdvisoryTopic(Destination destination) throws JMSException { return getMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 3843ed73e0..36cc849d6a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -457,7 +457,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { * @throws Exception */ protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception { - broker.sendToDeadLetterQueue(context, node); + broker.getRoot().sendToDeadLetterQueue(context, node); } public int getInFlightSize() {