set correct consumer count on consumer advisories

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@612459 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-01-16 13:56:24 +00:00
parent e7245768b3
commit 5f4db41d2d
5 changed files with 46 additions and 16 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.advisory;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker;
@ -83,7 +84,7 @@ public class AdvisoryBroker extends BrokerFilter {
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
consumers.put(info.getConsumerId(), info);
fireConsumerAdvisory(context, topic, info);
fireConsumerAdvisory(context,info.getDestination(), topic, info);
} else {
// We need to replay all the previously collected state objects
@ -114,7 +115,7 @@ public class AdvisoryBroker extends BrokerFilter {
for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) {
ProducerInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
fireProducerAdvisory(context, topic, value, info.getConsumerId());
fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId());
}
}
@ -123,7 +124,7 @@ public class AdvisoryBroker extends BrokerFilter {
for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();) {
ConsumerInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
fireConsumerAdvisory(context, topic, value, info.getConsumerId());
fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
}
}
}
@ -219,7 +220,7 @@ public class AdvisoryBroker extends BrokerFilter {
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
consumers.remove(info.getConsumerId());
fireConsumerAdvisory(context, topic, info.createRemoveCommand());
fireConsumerAdvisory(context,info.getDestination(), topic, info.createRemoveCommand());
}
}
@ -230,7 +231,7 @@ public class AdvisoryBroker extends BrokerFilter {
if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
producers.remove(info.getProducerId());
fireProducerAdvisory(context, topic, info.createRemoveCommand());
fireProducerAdvisory(context, info.getDestination(),topic, info.createRemoveCommand());
}
}
@ -253,21 +254,28 @@ public class AdvisoryBroker extends BrokerFilter {
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
}
protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
fireConsumerAdvisory(context, topic, command, null);
protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception {
fireConsumerAdvisory(context, consumerDestination,topic, command, null);
}
protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setIntProperty("consumerCount", consumers.size());
int count = 0;
Set<Destination>set = getDestinations(consumerDestination);
if (set != null) {
for (Destination dest:set) {
count += dest.getDestinationStatistics().getConsumers().getCount();
}
}
advisoryMessage.setIntProperty("consumerCount", count);
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
}
protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
fireProducerAdvisory(context, topic, command, null);
protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
fireProducerAdvisory(context,producerDestination, topic, command, null);
}
protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setIntProperty("producerCount", producers.size());
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);

View File

@ -65,7 +65,7 @@ public class BrokerFilter implements Broker {
return next.getDestinationMap();
}
public Set getDestinations(ActiveMQDestination destination) {
public Set <Destination>getDestinations(ActiveMQDestination destination) {
return next.getDestinations(destination);
}

View File

@ -725,16 +725,20 @@ public class BrokerService implements Service {
if (persistenceAdapter == null) {
persistenceAdapter = createPersistenceAdapter();
configureService(persistenceAdapter);
this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
}
return persistenceAdapter;
}
/**
* Sets the persistence adaptor implementation to use for this broker
* @throws IOException
*/
public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
this.persistenceAdapter = persistenceAdapter;
configureService(this.persistenceAdapter);
this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
}
public TaskRunnerFactory getTaskRunnerFactory() {
@ -1313,6 +1317,24 @@ public class BrokerService implements Service {
}
}
}
protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
}
return adaptor;
}
protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
if (mbeanServer != null) {
}
}
}
private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector,"

View File

@ -131,6 +131,6 @@ public interface Region extends Service {
*
* @return a set of matching destination objects.
*/
Set getDestinations(ActiveMQDestination destination);
Set <Destination>getDestinations(ActiveMQDestination destination);
}

View File

@ -120,7 +120,7 @@ public class RegionBroker implements Broker {
return answer;
}
public Set getDestinations(ActiveMQDestination destination) {
public Set <Destination> getDestinations(ActiveMQDestination destination) {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
return queueRegion.getDestinations(destination);