git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1349585 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-06-12 23:38:40 +00:00
parent 59b38fcc43
commit ccf601e4e4
2 changed files with 65 additions and 34 deletions

View File

@ -16,6 +16,13 @@
*/ */
package org.apache.activemq.plugin; package org.apache.activemq.plugin;
import java.io.File;
import java.net.URI;
import java.util.Set;
import javax.jms.JMSException;
import javax.management.ObjectName;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.BrokerFilter;
@ -39,22 +46,17 @@ import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.management.ObjectName;
import java.io.File;
import java.net.URI;
import java.util.Set;
/** /**
* A StatisticsBroker You can retrieve a Map Message for a Destination - or * A StatisticsBroker You can retrieve a Map Message for a Destination - or
* Broker containing statistics as key-value pairs The message must contain a * Broker containing statistics as key-value pairs The message must contain a
* replyTo Destination - else its ignored * replyTo Destination - else its ignored
* *
*/ */
public class StatisticsBroker extends BrokerFilter { public class StatisticsBroker extends BrokerFilter {
private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class); private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination"; static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker"; static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset";
static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription"; static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
private static final IdGenerator ID_GENERATOR = new IdGenerator(); private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
@ -62,9 +64,9 @@ public class StatisticsBroker extends BrokerFilter {
protected BrokerViewMBean brokerView; protected BrokerViewMBean brokerView;
/** /**
* *
* Constructor * Constructor
* *
* @param next * @param next
*/ */
public StatisticsBroker(Broker next) { public StatisticsBroker(Broker next) {
@ -74,7 +76,7 @@ public class StatisticsBroker extends BrokerFilter {
/** /**
* Sets the persistence mode * Sets the persistence mode
* *
* @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange, * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
* org.apache.activemq.command.Message) * org.apache.activemq.command.Message)
*/ */
@ -123,6 +125,11 @@ public class StatisticsBroker extends BrokerFilter {
sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo); sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo); sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
} else if (brokerStats) { } else if (brokerStats) {
if (messageSend.getProperties().containsKey(STATS_BROKER_RESET_HEADER)) {
getBrokerView().resetStatistics();
}
ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
SystemUsage systemUsage = brokerService.getSystemUsage(); SystemUsage systemUsage = brokerService.getSystemUsage();
DestinationStatistics stats = regionBroker.getDestinationStatistics(); DestinationStatistics stats = regionBroker.getDestinationStatistics();

View File

@ -16,13 +16,7 @@
*/ */
package org.apache.activemq.plugin; package org.apache.activemq.plugin;
import junit.framework.TestCase; import java.net.URI;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
@ -32,7 +26,15 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import java.net.URI;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A BrokerStatisticsPluginTest * A BrokerStatisticsPluginTest
@ -41,10 +43,10 @@ import java.net.URI;
*/ */
public class BrokerStatisticsPluginTest extends TestCase{ public class BrokerStatisticsPluginTest extends TestCase{
private static final Logger LOG = LoggerFactory.getLogger(BrokerStatisticsPluginTest.class); private static final Logger LOG = LoggerFactory.getLogger(BrokerStatisticsPluginTest.class);
private Connection connection; private Connection connection;
private BrokerService broker; private BrokerService broker;
public void testBrokerStats() throws Exception{ public void testBrokerStats() throws Exception{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue(); Queue replyTo = session.createTemporaryQueue();
@ -63,10 +65,36 @@ public class BrokerStatisticsPluginTest extends TestCase{
System.err.println(name+"="+reply.getObject(name)); System.err.println(name+"="+reply.getObject(name));
} }
*/ */
} }
public void testBrokerStatsReset() throws Exception{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
Queue testQueue = session.createQueue("Test.Queue");
Queue query = session.createQueue(StatisticsBroker.STATS_BROKER_PREFIX);
MessageProducer producer = session.createProducer(null);
producer.send(testQueue, session.createMessage());
Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(query, msg);
MapMessage reply = (MapMessage) consumer.receive(10*1000);
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
assertTrue(reply.getLong("enqueueCount") >= 1);
msg = session.createMessage();
msg.setBooleanProperty(StatisticsBroker.STATS_BROKER_RESET_HEADER, true);
msg.setJMSReplyTo(replyTo);
producer.send(query, msg);
reply = (MapMessage) consumer.receive(10*1000);
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
assertEquals(0, reply.getLong("enqueueCount"));
}
public void testDestinationStats() throws Exception{ public void testDestinationStats() throws Exception{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue(); Queue replyTo = session.createTemporaryQueue();
@ -75,9 +103,9 @@ public class BrokerStatisticsPluginTest extends TestCase{
MessageProducer producer = session.createProducer(null); MessageProducer producer = session.createProducer(null);
Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + testQueue.getQueueName()); Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + testQueue.getQueueName());
Message msg = session.createMessage(); Message msg = session.createMessage();
producer.send(testQueue,msg); producer.send(testQueue,msg);
msg.setJMSReplyTo(replyTo); msg.setJMSReplyTo(replyTo);
producer.send(query,msg); producer.send(query,msg);
MapMessage reply = (MapMessage) consumer.receive(); MapMessage reply = (MapMessage) consumer.receive();
@ -89,10 +117,9 @@ public class BrokerStatisticsPluginTest extends TestCase{
System.err.println(name+"="+reply.getObject(name)); System.err.println(name+"="+reply.getObject(name));
} }
*/ */
} }
@SuppressWarnings("unused")
public void testSubscriptionStats() throws Exception{ public void testSubscriptionStats() throws Exception{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue(); Queue replyTo = session.createTemporaryQueue();
@ -115,17 +142,15 @@ public class BrokerStatisticsPluginTest extends TestCase{
String name = e.nextElement().toString(); String name = e.nextElement().toString();
System.err.println(name+"="+reply.getObject(name)); System.err.println(name+"="+reply.getObject(name));
}*/ }*/
} }
protected void setUp() throws Exception { protected void setUp() throws Exception {
broker = createBroker(); broker = createBroker();
ConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectorURIsAsMap().get("tcp")); ConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectorURIsAsMap().get("tcp"));
connection = factory.createConnection(); connection = factory.createConnection();
connection.start(); connection.start();
} }
protected void tearDown() throws Exception{ protected void tearDown() throws Exception{
if (this.connection != null) { if (this.connection != null) {
this.connection.close(); this.connection.close();
@ -134,9 +159,8 @@ public class BrokerStatisticsPluginTest extends TestCase{
this.broker.stop(); this.broker.stop();
} }
} }
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
//return createBroker("org/apache/activemq/plugin/statistics-plugin-broker.xml");
BrokerService answer = new BrokerService(); BrokerService answer = new BrokerService();
BrokerPlugin[] plugins = new BrokerPlugin[1]; BrokerPlugin[] plugins = new BrokerPlugin[1];
plugins[0] = new StatisticsBrokerPlugin(); plugins[0] = new StatisticsBrokerPlugin();
@ -146,7 +170,7 @@ public class BrokerStatisticsPluginTest extends TestCase{
answer.start(); answer.start();
return answer; return answer;
} }
protected BrokerService createBroker(String uri) throws Exception { protected BrokerService createBroker(String uri) throws Exception {
LOG.info("Loading broker configuration from the classpath with URI: " + uri); LOG.info("Loading broker configuration from the classpath with URI: " + uri);
return BrokerFactory.createBroker(new URI("xbean:" + uri)); return BrokerFactory.createBroker(new URI("xbean:" + uri));