mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@956481 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b8c2d09220
commit
69e3b6c8c8
|
@ -387,6 +387,21 @@ public class AdvisoryBroker extends BrokerFilter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
|
||||
super.sendToDeadLetterQueue(context, messageReference);
|
||||
try {
|
||||
if(!messageReference.isAdvisory()) {
|
||||
ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
|
||||
Message payload = messageReference.getMessage().copy();
|
||||
payload.clearBody();
|
||||
fireAdvisory(context, topic,payload);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to fire message consumed advisory");
|
||||
}
|
||||
}
|
||||
|
||||
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
|
||||
fireAdvisory(context, topic, command, null);
|
||||
}
|
||||
|
|
|
@ -16,11 +16,11 @@
|
|||
*/
|
||||
package org.apache.activemq.advisory;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import org.apache.activemq.ActiveMQMessageTransformation;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
public final class AdvisorySupport {
|
||||
public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
|
||||
public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX
|
||||
|
@ -45,6 +45,7 @@ public final class AdvisorySupport {
|
|||
public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
|
||||
public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
|
||||
public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
|
||||
public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
|
||||
public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
|
||||
public static final String AGENT_TOPIC = "ActiveMQ.Agent";
|
||||
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
|
||||
|
@ -186,6 +187,12 @@ public final class AdvisorySupport {
|
|||
return new ActiveMQTopic(name);
|
||||
}
|
||||
|
||||
public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) {
|
||||
String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
|
||||
+ destination.getPhysicalName();
|
||||
return new ActiveMQTopic(name);
|
||||
}
|
||||
|
||||
public static ActiveMQTopic getMasterBrokerAdvisoryTopic(Destination destination) throws JMSException {
|
||||
return getMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
|
||||
}
|
||||
|
|
|
@ -457,7 +457,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
* @throws Exception
|
||||
*/
|
||||
protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
|
||||
broker.sendToDeadLetterQueue(context, node);
|
||||
broker.getRoot().sendToDeadLetterQueue(context, node);
|
||||
}
|
||||
|
||||
public int getInFlightSize() {
|
||||
|
|
Loading…
Reference in New Issue