https://issues.apache.org/jira/browse/AMQ-3820 - statistics plugin - subscription statistics

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1331372 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2012-04-27 12:08:39 +00:00
parent 0b1b932ea7
commit e5c4ffd673
2 changed files with 80 additions and 5 deletions

View File

@ -22,6 +22,8 @@ import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker;
@ -37,6 +39,9 @@ import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.management.ObjectName;
import java.io.File;
import java.net.URI;
import java.util.Set;
@ -50,9 +55,11 @@ public class StatisticsBroker extends BrokerFilter {
private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
protected final ProducerId advisoryProducerId = new ProducerId();
protected BrokerViewMBean brokerView;
/**
*
@ -80,6 +87,10 @@ public class StatisticsBroker extends BrokerFilter {
STATS_DESTINATION_PREFIX.length());
boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
.length());
boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX
.length());
BrokerService brokerService = getBrokerService();
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
if (destStats) {
String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType());
@ -108,10 +119,11 @@ public class StatisticsBroker extends BrokerFilter {
sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
}
}
} else if (subStats) {
sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
} else if (brokerStats) {
ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
BrokerService brokerService = getBrokerService();
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
SystemUsage systemUsage = brokerService.getSystemUsage();
DestinationStatistics stats = regionBroker.getDestinationStatistics();
statsMessage.setString("brokerName", regionBroker.getBrokerName());
@ -165,6 +177,15 @@ public class StatisticsBroker extends BrokerFilter {
}
}
BrokerViewMBean getBrokerView() throws Exception {
if (this.brokerView == null) {
ObjectName brokerName = getBrokerService().getBrokerObjectName();
this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName,
BrokerViewMBean.class, true);
}
return this.brokerView;
}
public void start() throws Exception {
super.start();
LOG.info("Starting StatisticsBroker");
@ -174,6 +195,34 @@ public class StatisticsBroker extends BrokerFilter {
super.stop();
}
protected void sendSubStats(ConnectionContext context, ObjectName[] subscribers, ActiveMQDestination replyTo) throws Exception {
for (int i = 0; i < subscribers.length; i++) {
ObjectName name = subscribers[i];
SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true);
ActiveMQMapMessage statsMessage = prepareSubscriptionMessage(subscriber);
sendStats(context, statsMessage, replyTo);
}
}
protected ActiveMQMapMessage prepareSubscriptionMessage(SubscriptionViewMBean subscriber) throws JMSException {
ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
statsMessage.setString("destinationName", subscriber.getDestinationName());
statsMessage.setString("clientId", subscriber.getClientId());
statsMessage.setString("connectionId", subscriber.getConnectionId());
statsMessage.setLong("sessionId", subscriber.getSessionId());
statsMessage.setString("selector", subscriber.getSelector());
statsMessage.setLong("enqueueCounter", subscriber.getEnqueueCounter());
statsMessage.setLong("dequeueCounter", subscriber.getDequeueCounter());
statsMessage.setLong("dispatchedCounter", subscriber.getDispatchedCounter());
statsMessage.setLong("dispatchedQueueSize", subscriber.getDispatchedQueueSize());
statsMessage.setInt("prefetchSize", subscriber.getPrefetchSize());
statsMessage.setInt("maximumPendingMessageLimit", subscriber.getMaximumPendingMessageLimit());
statsMessage.setBoolean("exclusive", subscriber.isExclusive());
statsMessage.setBoolean("retroactive", subscriber.isRetroactive());
statsMessage.setBoolean("slowConsumer", subscriber.isSlowConsumer());
return statsMessage;
}
protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
throws Exception {
msg.setPersistent(false);

View File

@ -16,14 +16,14 @@
*/
package org.apache.activemq.plugin;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MapMessage;
@ -32,7 +32,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import junit.framework.TestCase;
import java.net.URI;
/**
* A BrokerStatisticsPluginTest
@ -91,6 +91,32 @@ public class BrokerStatisticsPluginTest extends TestCase{
*/
}
public void testSubscriptionStats() throws Exception{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
Queue testQueue = session.createQueue("Test.Queue");
MessageConsumer testConsumer = session.createConsumer(testQueue);
MessageProducer producer = session.createProducer(null);
Queue query = session.createQueue(StatisticsBroker.STATS_SUBSCRIPTION_PREFIX);
Message msg = session.createMessage();
producer.send(testQueue,msg);
msg.setJMSReplyTo(replyTo);
producer.send(query,msg);
MapMessage reply = (MapMessage) consumer.receive();
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
/*for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
System.err.println(name+"="+reply.getObject(name));
}*/
}
protected void setUp() throws Exception {