From 5f4db41d2da5df90f3f21097b6b80b0941dad050 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 16 Jan 2008 13:56:24 +0000 Subject: [PATCH] set correct consumer count on consumer advisories git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@612459 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/advisory/AdvisoryBroker.java | 32 ++++++++++++------- .../apache/activemq/broker/BrokerFilter.java | 2 +- .../apache/activemq/broker/BrokerService.java | 24 +++++++++++++- .../apache/activemq/broker/region/Region.java | 2 +- .../activemq/broker/region/RegionBroker.java | 2 +- 5 files changed, 46 insertions(+), 16 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 82630b9e14..c62e68d771 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -17,6 +17,7 @@ package org.apache.activemq.advisory; import java.util.Iterator; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.broker.Broker; @@ -83,7 +84,7 @@ public class AdvisoryBroker extends BrokerFilter { if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); consumers.put(info.getConsumerId(), info); - fireConsumerAdvisory(context, topic, info); + fireConsumerAdvisory(context,info.getDestination(), topic, info); } else { // We need to replay all the previously collected state objects @@ -114,7 +115,7 @@ public class AdvisoryBroker extends BrokerFilter { for (Iterator iter = producers.values().iterator(); iter.hasNext();) { ProducerInfo value = iter.next(); ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination()); - fireProducerAdvisory(context, topic, value, info.getConsumerId()); + fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId()); } } @@ -123,7 +124,7 @@ public class AdvisoryBroker extends BrokerFilter { for (Iterator iter = consumers.values().iterator(); iter.hasNext();) { ConsumerInfo value = iter.next(); ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); - fireConsumerAdvisory(context, topic, value, info.getConsumerId()); + fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId()); } } } @@ -219,7 +220,7 @@ public class AdvisoryBroker extends BrokerFilter { if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); consumers.remove(info.getConsumerId()); - fireConsumerAdvisory(context, topic, info.createRemoveCommand()); + fireConsumerAdvisory(context,info.getDestination(), topic, info.createRemoveCommand()); } } @@ -230,7 +231,7 @@ public class AdvisoryBroker extends BrokerFilter { if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination()); producers.remove(info.getProducerId()); - fireProducerAdvisory(context, topic, info.createRemoveCommand()); + fireProducerAdvisory(context, info.getDestination(),topic, info.createRemoveCommand()); } } @@ -253,21 +254,28 @@ public class AdvisoryBroker extends BrokerFilter { fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); } - protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { - fireConsumerAdvisory(context, topic, command, null); + protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception { + fireConsumerAdvisory(context, consumerDestination,topic, command, null); } - protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { + protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { ActiveMQMessage advisoryMessage = new ActiveMQMessage(); - advisoryMessage.setIntProperty("consumerCount", consumers.size()); + int count = 0; + Setset = getDestinations(consumerDestination); + if (set != null) { + for (Destination dest:set) { + count += dest.getDestinationStatistics().getConsumers().getCount(); + } + } + advisoryMessage.setIntProperty("consumerCount", count); fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); } - protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { - fireProducerAdvisory(context, topic, command, null); + protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception { + fireProducerAdvisory(context,producerDestination, topic, command, null); } - protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { + protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception { ActiveMQMessage advisoryMessage = new ActiveMQMessage(); advisoryMessage.setIntProperty("producerCount", producers.size()); fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index bf98e5c9a5..5ef26d41e4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -65,7 +65,7 @@ public class BrokerFilter implements Broker { return next.getDestinationMap(); } - public Set getDestinations(ActiveMQDestination destination) { + public Set getDestinations(ActiveMQDestination destination) { return next.getDestinations(destination); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index df52fb6923..b1b1bdd935 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -725,16 +725,20 @@ public class BrokerService implements Service { if (persistenceAdapter == null) { persistenceAdapter = createPersistenceAdapter(); configureService(persistenceAdapter); + this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); } return persistenceAdapter; } /** * Sets the persistence adaptor implementation to use for this broker + * @throws IOException */ - public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) { + public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException { this.persistenceAdapter = persistenceAdapter; configureService(this.persistenceAdapter); + this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter); + } public TaskRunnerFactory getTaskRunnerFactory() { @@ -1313,6 +1317,24 @@ public class BrokerService implements Service { } } } + + protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { + MBeanServer mbeanServer = getManagementContext().getMBeanServer(); + if (mbeanServer != null) { + + + } + return adaptor; + } + + protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { + if (isUseJmx()) { + MBeanServer mbeanServer = getManagementContext().getMBeanServer(); + if (mbeanServer != null) { + + } + } + } private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { return new ObjectName(managementContext.getJmxDomainName() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java index 62058d69c6..58c3610ff5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java @@ -131,6 +131,6 @@ public interface Region extends Service { * * @return a set of matching destination objects. */ - Set getDestinations(ActiveMQDestination destination); + Set getDestinations(ActiveMQDestination destination); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 443d164bb6..511d4f7bef 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -120,7 +120,7 @@ public class RegionBroker implements Broker { return answer; } - public Set getDestinations(ActiveMQDestination destination) { + public Set getDestinations(ActiveMQDestination destination) { switch (destination.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: return queueRegion.getDestinations(destination);