diff --git a/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java b/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java index cfc5c645b6..5df6996859 100644 --- a/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java @@ -22,7 +22,6 @@ import java.util.Set; import javax.jms.JMSException; import javax.management.ObjectName; - import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; @@ -58,6 +57,7 @@ public class StatisticsBroker extends BrokerFilter { static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker"; static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset"; static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription"; + static final String STATS_DENOTE_END_LIST = STATS_DESTINATION_PREFIX + ".List.End.With.Null"; private static final IdGenerator ID_GENERATOR = new IdGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); protected final ProducerId advisoryProducerId = new ProducerId(); @@ -94,10 +94,13 @@ public class StatisticsBroker extends BrokerFilter { BrokerService brokerService = getBrokerService(); RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); if (destStats) { - String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length()); - ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType()); - Set set = getDestinations(queryDest); - for (Destination dest : set) { + String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length()); + String destinationQuery = destinationName.replace(STATS_DENOTE_END_LIST,""); + boolean endListMessage = !destinationName.equals(destinationQuery); + ActiveMQDestination queryDestination = ActiveMQDestination.createDestination(destinationQuery,msgDest.getDestinationType()); + Set destinations = getDestinations(queryDestination); + + for (Destination dest : destinations) { DestinationStatistics stats = dest.getDestinationStatistics(); if (stats != null) { ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); @@ -121,6 +124,12 @@ public class StatisticsBroker extends BrokerFilter { sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo); } } + if(endListMessage){ + ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); + statsMessage.setJMSCorrelationID(messageSend.getCorrelationId()); + sendStats(producerExchange.getConnectionContext(),statsMessage,replyTo); + } + } else if (subStats) { sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo); sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo); @@ -250,4 +259,5 @@ public class StatisticsBroker extends BrokerFilter { context.setProducerFlowControl(originalFlowControl); } } + }