From 6fc3744c73594d8e768f7ea866fda3f797951c20 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 8 Sep 2009 14:02:21 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-2379 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@812514 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/jmx/BrokerView.java | 11 + .../activemq/broker/jmx/BrokerViewMBean.java | 3 + .../activemq/broker/jmx/DestinationView.java | 41 ++-- .../plugin/DiscardingDLQBrokerPlugin.java | 1 + .../ForcePersistencyModeBrokerPlugin.java | 10 +- .../activemq/plugin/StatisticsBroker.java | 195 ++++++++++++++++++ .../plugin/StatisticsBrokerPlugin.java | 42 ++++ .../xbean/spring/http/activemq.org/config/1.0 | 8 + activemq-core/src/main/resources/activemq.xsd | 49 ++++- .../plugin/BrokerStatisticsPluginTest.java | 128 ++++++++++++ .../plugin/statistics-plugin-broker.xml | 36 ++++ 11 files changed, 496 insertions(+), 28 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBrokerPlugin.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java create mode 100644 activemq-core/src/test/resources/org/apache/activemq/plugin/statistics-plugin-broker.xml diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index e9cf7d30f9..7c3936c18e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker.jmx; +import java.io.File; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URI; @@ -346,4 +348,13 @@ public class BrokerView implements BrokerViewMBean { URI answer = brokerService.getVmConnectorURI(); return answer != null ? answer.toString() : ""; } + + public String getDataDirectory() { + File file = brokerService.getDataDirectoryFile(); + try { + return file != null ? file.getCanonicalPath():""; + } catch (IOException e) { + return ""; + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java index 33e46be59d..e37562ae3e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java @@ -238,4 +238,7 @@ public interface BrokerViewMBean extends Service { @MBeanInfo("The url of the VM connector") String getVMURL(); + @MBeanInfo("The location of the data directory") + public String getDataDirectory(); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index a4158ec277..ec0c8a9f4f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -16,31 +16,9 @@ */ package org.apache.activemq.broker.jmx; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.io.IOException; - -import javax.jms.Connection; -import javax.jms.InvalidSelectorException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; -import javax.management.ObjectName; -import javax.management.MalformedObjectNameException; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -51,6 +29,25 @@ import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.selector.SelectorParser; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import javax.jms.Connection; +import javax.jms.InvalidSelectorException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; public class DestinationView implements DestinationViewMBean { private static final Log LOG = LogFactory.getLog(DestinationViewMBean.class); diff --git a/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java index b2d31729b8..4359b2cea7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBrokerPlugin.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; /** * @author Filip Hanik + * @org.apache.xbean.XBean element="discardingDLQBrokerPlugin" * @version 1.0 */ public class DiscardingDLQBrokerPlugin implements BrokerPlugin { diff --git a/activemq-core/src/main/java/org/apache/activemq/plugin/ForcePersistencyModeBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/plugin/ForcePersistencyModeBrokerPlugin.java index 5cefd346c1..7ee24e1eb8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/plugin/ForcePersistencyModeBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/plugin/ForcePersistencyModeBrokerPlugin.java @@ -28,12 +28,16 @@ import org.apache.commons.logging.LogFactory; * * Useful, if you have set the broker usage policy to process ONLY persistent or ONLY non-persistent * messages. + * @org.apache.xbean.XBean element="forcePersistencyModeBrokerPlugin" */ public class ForcePersistencyModeBrokerPlugin implements BrokerPlugin { - public static Log log = LogFactory.getLog(ForcePersistencyModeBrokerPlugin.class); + private static Log LOG = LogFactory.getLog(ForcePersistencyModeBrokerPlugin.class); private boolean persistenceFlag = false; - public ForcePersistencyModeBrokerPlugin() { + /** + * Constructor + */ +public ForcePersistencyModeBrokerPlugin() { } /** @@ -46,7 +50,7 @@ public class ForcePersistencyModeBrokerPlugin implements BrokerPlugin { public Broker installPlugin(Broker broker) throws Exception{ ForcePersistencyModeBroker pB = new ForcePersistencyModeBroker(broker); pB.setPersistenceFlag(isPersistenceForced()); - log.info("Installing ForcePersistencyModeBroker plugin: persistency enforced=" + pB.isPersistent()); + LOG.info("Installing ForcePersistencyModeBroker plugin: persistency enforced=" + pB.isPersistent()); return pB; } diff --git a/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java b/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java new file mode 100644 index 0000000000..c04b87b51b --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.plugin; + +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.state.ProducerState; +import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.LongSequenceGenerator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.io.File; +import java.net.URI; +import java.util.Set; +/** + * A StatisticsBroker You can retrieve a Map Message for a Destination - or + * Broker containing statistics as key-value pairs The message must contain a + * replyTo Destination - else its ignored + * + */ +public class StatisticsBroker extends BrokerFilter { + private static Log LOG = LogFactory.getLog(StatisticsBroker.class); + static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination"; + static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker"; + private static final IdGenerator ID_GENERATOR = new IdGenerator(); + private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); + protected final ProducerId advisoryProducerId = new ProducerId(); + + /** + * + * Constructor + * + * @param next + */ + public StatisticsBroker(Broker next) { + super(next); + this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); + } + + /** + * Sets the persistence mode + * + * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange, + * org.apache.activemq.command.Message) + */ + public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { + ActiveMQDestination msgDest = messageSend.getDestination(); + ActiveMQDestination replyTo = messageSend.getReplyTo(); + if (replyTo != null) { + String physicalName = msgDest.getPhysicalName(); + boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0, + STATS_DESTINATION_PREFIX.length()); + boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX + .length()); + if (destStats) { + String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length()); + ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType()); + Set set = getDestinations(queryDest); + for (Destination dest : set) { + DestinationStatistics stats = dest.getDestinationStatistics(); + if (stats != null) { + ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); + statsMessage.setString("destinationName", dest.getActiveMQDestination().toString()); + statsMessage.setLong("size", stats.getMessages().getCount()); + statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount()); + statsMessage.setLong("dequeueCount", stats.getDequeues().getCount()); + statsMessage.setLong("dispatchCount", stats.getDispatched().getCount()); + statsMessage.setLong("expiredCount", stats.getExpired().getCount()); + statsMessage.setLong("inflightCount", stats.getInflight().getCount()); + statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount()); + statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage()); + statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage()); + statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit()); + statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime()); + statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime()); + statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime()); + statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); + statsMessage.setLong("producerCount", stats.getProducers().getCount()); + sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo); + } + } + } else if (brokerStats) { + ActiveMQMapMessage statsMessage = new ActiveMQMapMessage(); + BrokerService brokerService = getBrokerService(); + RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); + SystemUsage systemUsage = brokerService.getSystemUsage(); + DestinationStatistics stats = regionBroker.getDestinationStatistics(); + statsMessage.setString("brokerName", regionBroker.getBrokerName()); + statsMessage.setString("brokerId", regionBroker.getBrokerId().toString()); + statsMessage.setLong("size", stats.getMessages().getCount()); + statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount()); + statsMessage.setLong("dequeueCount", stats.getDequeues().getCount()); + statsMessage.setLong("dispatchCount", stats.getDispatched().getCount()); + statsMessage.setLong("expiredCount", stats.getExpired().getCount()); + statsMessage.setLong("inflightCount", stats.getInflight().getCount()); + statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount()); + statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage()); + statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage()); + statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit()); + statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage()); + statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage()); + statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit()); + statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage()); + statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage()); + statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit()); + statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime()); + statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime()); + statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime()); + statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); + statsMessage.setLong("producerCount", stats.getProducers().getCount()); + String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp"); + answer = answer != null ? answer : ""; + statsMessage.setString("openwire", answer); + answer = brokerService.getTransportConnectorURIsAsMap().get("stomp"); + answer = answer != null ? answer : ""; + statsMessage.setString("stomp", answer); + answer = brokerService.getTransportConnectorURIsAsMap().get("ssl"); + answer = answer != null ? answer : ""; + statsMessage.setString("ssl", answer); + answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl"); + answer = answer != null ? answer : ""; + statsMessage.setString("stomp+ssl", answer); + URI uri = brokerService.getVmConnectorURI(); + answer = uri != null ? uri.toString() : ""; + statsMessage.setString("vm", answer); + File file = brokerService.getDataDirectoryFile(); + answer = file != null ? file.getCanonicalPath() : ""; + statsMessage.setString("dataDirectory", answer); + sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo); + } else { + super.send(producerExchange, messageSend); + } + } else { + super.send(producerExchange, messageSend); + } + } + + public void start() throws Exception { + super.start(); + LOG.info("Starting StatisticsBroker"); + } + + public void stop() throws Exception { + super.stop(); + } + + protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo) + throws Exception { + msg.setPersistent(false); + msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); + msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId())); + msg.setDestination(replyTo); + msg.setResponseRequired(false); + msg.setProducerId(this.advisoryProducerId); + boolean originalFlowControl = context.isProducerFlowControl(); + final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); + producerExchange.setConnectionContext(context); + producerExchange.setMutable(true); + producerExchange.setProducerState(new ProducerState(new ProducerInfo())); + try { + context.setProducerFlowControl(false); + this.next.send(producerExchange, msg); + } finally { + context.setProducerFlowControl(originalFlowControl); + } + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBrokerPlugin.java new file mode 100644 index 0000000000..43f9a7f10b --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBrokerPlugin.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.plugin; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A StatisticsBrokerPlugin + * @org.apache.xbean.XBean element="statisticsBrokerPlugin" + * + */ +public class StatisticsBrokerPlugin implements BrokerPlugin { + private static Log LOG = LogFactory.getLog(StatisticsBrokerPlugin.class); + /** + * @param broker + * @return the plug-in + * @throws Exception + * @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker) + */ + public Broker installPlugin(Broker broker) throws Exception { + StatisticsBroker answer = new StatisticsBroker(broker); + LOG.info("Installing StaticsBroker"); + return answer; + } +} diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.org/config/1.0 b/activemq-core/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.org/config/1.0 index 7605a2603e..ce6ceffae9 100644 --- a/activemq-core/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.org/config/1.0 +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.org/config/1.0 @@ -267,3 +267,11 @@ vmQueueCursor = org.apache.activemq.broker.region.policy.VMPendingQueueMessageSt xaConnectionFactory = org.apache.activemq.spring.ActiveMQXAConnectionFactory +statisticsBrokerPlugin = org.apache.activemq.plugin.StatisticsBrokerPlugin + +forcePersistencyModeBrokerPlugin = org.apache.activemq.plugin.ForcePersistencyModeBrokerPlugin + +discardingDLQBrokerPlugin = org.apache.activemq.plugin.DiscardingDLQBrokerPlugin + + + diff --git a/activemq-core/src/main/resources/activemq.xsd b/activemq-core/src/main/resources/activemq.xsd index 7e6a0f8df5..74026c041b 100644 --- a/activemq-core/src/main/resources/activemq.xsd +++ b/activemq-core/src/main/resources/activemq.xsd @@ -569,7 +569,7 @@ other brokers in a federated network @@ -586,6 +586,9 @@ authentication or authorization + + + @@ -617,6 +620,9 @@ other brokers in a federated network + + + @@ -5888,6 +5894,43 @@ warning if the user forgets. To disable the warning just set the value to < - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java b/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java new file mode 100644 index 0000000000..1168f4c82d --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.plugin; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.net.URI; +import java.util.Enumeration; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import junit.framework.TestCase; + +/** + * A BrokerStatisticsPluginTest + * A testcase for https://issues.apache.org/activemq/browse/AMQ-2379 + * + */ +public class BrokerStatisticsPluginTest extends TestCase{ + private static final Log LOG = LogFactory.getLog(BrokerStatisticsPluginTest.class); + + private Connection connection; + private BrokerService broker; + + public void testBrokerStats() throws Exception{ + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue replyTo = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(replyTo); + Queue query = session.createQueue(StatisticsBroker.STATS_BROKER_PREFIX); + MessageProducer producer = session.createProducer(query); + Message msg = session.createMessage(); + msg.setJMSReplyTo(replyTo); + producer.send(msg); + MapMessage reply = (MapMessage) consumer.receive(10*1000); + assertNotNull(reply); + assertTrue(reply.getMapNames().hasMoreElements()); + /* + for (Enumeration e = reply.getMapNames();e.hasMoreElements();) { + String name = e.nextElement().toString(); + System.err.println(name+"="+reply.getObject(name)); + } + */ + + + } + + public void testDestinationStats() throws Exception{ + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue replyTo = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(replyTo); + Queue testQueue = session.createQueue("Test.Queue"); + MessageProducer producer = session.createProducer(null); + Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + testQueue.getQueueName()); + Message msg = session.createMessage(); + + producer.send(testQueue,msg); + + msg.setJMSReplyTo(replyTo); + producer.send(query,msg); + MapMessage reply = (MapMessage) consumer.receive(); + assertNotNull(reply); + assertTrue(reply.getMapNames().hasMoreElements()); + /* + for (Enumeration e = reply.getMapNames();e.hasMoreElements();) { + String name = e.nextElement().toString(); + System.err.println(name+"="+reply.getObject(name)); + } + */ + + + } + + protected void setUp() throws Exception { + broker = createBroker(); + ConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectorURIsAsMap().get("tcp")); + connection = factory.createConnection(); + connection.start(); + } + + protected void tearDown() throws Exception{ + if (this.connection != null) { + this.connection.close(); + } + if (this.broker!=null) { + this.broker.stop(); + } + } + + protected BrokerService createBroker() throws Exception { + //return createBroker("org/apache/activemq/plugin/statistics-plugin-broker.xml"); + BrokerService answer = new BrokerService(); + BrokerPlugin[] plugins = new BrokerPlugin[1]; + plugins[0] = new StatisticsBrokerPlugin(); + answer.setPlugins(plugins); + answer.setDeleteAllMessagesOnStartup(true); + answer.addConnector("tcp://localhost:0"); + answer.start(); + return answer; + } + + protected BrokerService createBroker(String uri) throws Exception { + LOG.info("Loading broker configuration from the classpath with URI: " + uri); + return BrokerFactory.createBroker(new URI("xbean:" + uri)); + } +} \ No newline at end of file diff --git a/activemq-core/src/test/resources/org/apache/activemq/plugin/statistics-plugin-broker.xml b/activemq-core/src/test/resources/org/apache/activemq/plugin/statistics-plugin-broker.xml new file mode 100644 index 0000000000..ad3933a0c1 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/plugin/statistics-plugin-broker.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file