git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1350425 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-06-14 23:27:25 +00:00
parent 6f2c998b4d
commit bf1f755299
1 changed files with 27 additions and 9 deletions

View File

@ -20,6 +20,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
@ -28,7 +29,20 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.command.*;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
@ -336,10 +350,12 @@ public class AdvisoryBroker extends BrokerFilter {
public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
super.slowConsumer(context, destination,subs);
try {
ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
}
} catch (Exception e) {
handleFireFailure("slow consumer", e);
}
@ -349,10 +365,12 @@ public class AdvisoryBroker extends BrokerFilter {
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
super.fastProducer(context, producerInfo);
try {
ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
if (!AdvisorySupport.isAdvisoryTopic(producerInfo.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
}
} catch (Exception e) {
handleFireFailure("fast producer", e);
}