diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java index 6e1737665e..16a3cb07e0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java @@ -22,113 +22,154 @@ public class BrokerDestinationView { private final Destination destination; - public BrokerDestinationView(Destination destination) { + BrokerDestinationView(Destination destination) { this.destination = destination; } - + /** + * @return the name of the DestinationView + */ public String getName() { return destination.getName(); } + /** + * @return the number of messages enqueued by this destination + */ public long getEnqueueCount() { return destination.getDestinationStatistics().getEnqueues().getCount(); } + /** + * @return the number of messages dequeued (dispatched and removed) by this destination + */ public long getDequeueCount() { return destination.getDestinationStatistics().getDequeues().getCount(); } - + /** + * @return the number of messages dispatched by this destination + */ public long getDispatchCount() { return destination.getDestinationStatistics().getDispatched().getCount(); } - + /** + * @return the number of messages inflight (dispatched by not acknowledged) by this destination + */ public long getInFlightCount() { return destination.getDestinationStatistics().getInflight().getCount(); } - + /** + * @return the number of messages expired by this destination + */ public long getExpiredCount() { return destination.getDestinationStatistics().getExpired().getCount(); } - - public long getConsumerCount() { - return destination.getDestinationStatistics().getConsumers().getCount(); + /** + * @return the number of active consumers on this destination + */ + public int getConsumerCount() { + return (int)destination.getDestinationStatistics().getConsumers().getCount(); } + /** + * @return the number of active consumers on this destination + */ + public int getProducerCount() { + return (int)destination.getDestinationStatistics().getProducers().getCount(); + } + /** + * @return the depth of the Destination + */ public long getQueueSize() { return destination.getDestinationStatistics().getMessages().getCount(); } + /** + * @return the number of messages cached in memory by this destination + */ public long getMessagesCached() { return destination.getDestinationStatistics().getMessagesCached().getCount(); } - + /** + * @return the memory usage as a percentage for this Destination + */ public int getMemoryPercentUsage() { return destination.getMemoryUsage().getPercentUsage(); } - + /** + * @return the memory used by this destination in bytes + */ public long getMemoryUsageByteCount() { return destination.getMemoryUsage().getUsage(); } + /** + * @return the memory limit for this destination in bytes + */ public long getMemoryLimit() { return destination.getMemoryUsage().getLimit(); } - public void setMemoryLimit(long limit) { - destination.getMemoryUsage().setLimit(limit); - } - - + /** + * @return the average time it takes to store a message on this destination (ms) + */ public double getAverageEnqueueTime() { return destination.getDestinationStatistics().getProcessTime().getAverageTime(); } - + /** + * @return the maximum time it takes to store a message on this destination (ms) + */ public long getMaxEnqueueTime() { return destination.getDestinationStatistics().getProcessTime().getMaxTime(); } + /** + * @return the minimum time it takes to store a message on this destination (ms) + */ public long getMinEnqueueTime() { return destination.getDestinationStatistics().getProcessTime().getMinTime(); } - public float getMemoryUsagePortion() { - return destination.getMemoryUsage().getUsagePortion(); - } - - public long getProducerCount() { - return destination.getDestinationStatistics().getProducers().getCount(); - } - - + /** + * @return true if the destination is a Dead Letter Queue + */ public boolean isDLQ() { return destination.isDLQ(); } + /** + * @return the number of messages blocked waiting for dispatch (indication of slow consumption if greater than zero) + */ public long getBlockedSends() { return destination.getDestinationStatistics().getBlockedSends().getCount(); } + /** + * @return the average time(ms) messages are blocked waiting for dispatch (indication of slow consumption if greater than zero) + */ public double getAverageBlockedTime() { return destination.getDestinationStatistics().getBlockedTime().getAverageTime(); } + /** + * @return the total time(ms) messages are blocked waiting for dispatch (indication of slow consumption if greater than zero) + */ public long getTotalBlockedTime() { return destination.getDestinationStatistics().getBlockedTime().getTotalTime(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java index 16202519db..570fc5aa3b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java @@ -40,34 +40,74 @@ public class MessageBrokerView { private final BrokerService brokerService; private Map destinationViewMap = new LRUCache(); - MessageBrokerView(BrokerService brokerService){ + + /** + * Create a view of a running Broker + * @param brokerService + */ + public MessageBrokerView(BrokerService brokerService){ this.brokerService = brokerService; + if (brokerService == null){ + throw new NullPointerException("BrokerService is null"); + } + if (!brokerService.isStarted()){ + throw new IllegalStateException("BrokerService " + brokerService.getBrokerName() + " is not started"); + } } + + /** + * @return the brokerName + */ public String getBrokerName(){ return brokerService.getBrokerName(); } + /** + * @return the unique id of the Broker + */ + public String getBrokerId(){ + try { + return brokerService.getBroker().getBrokerId().toString(); + } catch (Exception e) { + return ""; + } + } + + /** + * @return the memory used by the Broker as a percentage + */ public int getMemoryPercentUsage() { return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage(); } + /** + * @return the space used by the Message Store as a percentage + */ public int getStorePercentUsage() { return brokerService.getSystemUsage().getStoreUsage().getPercentUsage(); } + /** + * @return the space used by the store for temporary messages as a percentage + */ public int getTempPercentUsage() { return brokerService.getSystemUsage().getTempUsage().getPercentUsage(); } - + /** + * @return the space used by the store of scheduled messages + */ public int getJobSchedulerStorePercentUsage() { return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage(); } + /** + * @return true if the Broker isn't using an in-memory store only for messages + */ public boolean isPersistent() { return brokerService.isPersistent(); } @@ -76,6 +116,10 @@ public class MessageBrokerView { return brokerService; } + /** + * Retrieve a set of all Destinations be used by the Broker + * @return all Destinations + */ public Set getDestinations(){ Set result; @@ -89,6 +133,11 @@ public class MessageBrokerView { return result; } + /** + * Retrieve a set of all Topics be used by the Broker + * @return all Topics + */ + public Set getTopics(){ Set result = new HashSet(); for (ActiveMQDestination destination:getDestinations()){ @@ -99,6 +148,11 @@ public class MessageBrokerView { return result; } + /** + * Retrieve a set of all Queues be used by the Broker + * @return all Queues + */ + public Set getQueues(){ Set result = new HashSet(); for (ActiveMQDestination destination:getDestinations()){ @@ -109,6 +163,10 @@ public class MessageBrokerView { return result; } + /** + * Retrieve a set of all TemporaryTopics be used by the Broker + * @return all TemporaryTopics + */ public Set getTempTopics(){ Set result = new HashSet(); for (ActiveMQDestination destination:getDestinations()){ @@ -119,6 +177,11 @@ public class MessageBrokerView { return result; } + + /** + * Retrieve a set of all TemporaryQueues be used by the Broker + * @return all TemporaryQueues + */ public Set getTempQueues(){ Set result = new HashSet(); for (ActiveMQDestination destination:getDestinations()){ @@ -135,9 +198,10 @@ public class MessageBrokerView { * will default to a Queue * @param destinationName * @return the BrokerDestinationView associated with the destinationName + * @throws Exception */ - public BrokerDestinationView getDestinationView(String destinationName){ + public BrokerDestinationView getDestinationView(String destinationName) throws Exception{ return getDestinationView(destinationName,ActiveMQDestination.QUEUE_TYPE); } @@ -145,9 +209,10 @@ public class MessageBrokerView { * Get the BrokerDestinationView associated with the topic * @param destinationName * @return BrokerDestinationView + * @throws Exception */ - public BrokerDestinationView getTopicDestinationView(String destinationName){ + public BrokerDestinationView getTopicDestinationView(String destinationName) throws Exception{ return getDestinationView(destinationName,ActiveMQDestination.TOPIC_TYPE); } @@ -155,23 +220,38 @@ public class MessageBrokerView { * Get the BrokerDestinationView associated with the queue * @param destinationName * @return BrokerDestinationView + * @throws Exception */ - public BrokerDestinationView getQueueDestinationView(String destinationName){ + public BrokerDestinationView getQueueDestinationView(String destinationName) throws Exception{ return getDestinationView(destinationName,ActiveMQDestination.QUEUE_TYPE); } - public BrokerDestinationView getDestinationView (String destinationName, byte type) { + + /** + * Get the BrokerDestinationView associated with destination + * @param destinationName + * @param type expects either ActiveMQDestination.QUEUE_TYPE, ActiveMQDestination.TOPIC_TYPE etc + * @return BrokerDestinationView + * @throws Exception + */ + public BrokerDestinationView getDestinationView (String destinationName, byte type) throws Exception { ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName,type); return getDestinationView(activeMQDestination); } - public BrokerDestinationView getDestinationView (ActiveMQDestination activeMQDestination) { + /** + * Get the BrokerDestinationView associated with destination + * @param activeMQDestination + * @return BrokerDestinationView + * @throws Exception + */ + public BrokerDestinationView getDestinationView (ActiveMQDestination activeMQDestination) throws Exception { BrokerDestinationView view = null; synchronized(destinationViewMap){ view = destinationViewMap.get(activeMQDestination); if (view==null){ - try { + /** * If auto destinatons are allowed (on by default) - this will create a Broker Destination * if it doesn't exist. We could query the regionBroker first to check - but this affords more @@ -179,12 +259,9 @@ public class MessageBrokerView { * messaging clients have started (and hence created the destination themselves */ Destination destination = brokerService.getDestination(activeMQDestination); - BrokerDestinationView brokerDestinationView = new BrokerDestinationView(destination); - destinationViewMap.put(activeMQDestination,brokerDestinationView); - } catch (Exception e) { - LOG.warn("Failed to get Destination for " + activeMQDestination,e); - } - destinationViewMap.put(activeMQDestination,view); + view = new BrokerDestinationView(destination); + destinationViewMap.put(activeMQDestination,view); + } } return view; diff --git a/activemq-broker/src/test/java/org/apache/activemq/broker/view/BrokerDestinationViewTest.java b/activemq-broker/src/test/java/org/apache/activemq/broker/view/BrokerDestinationViewTest.java new file mode 100644 index 0000000000..85f1c2450a --- /dev/null +++ b/activemq-broker/src/test/java/org/apache/activemq/broker/view/BrokerDestinationViewTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.view; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +public class BrokerDestinationViewTest { + + protected BrokerService brokerService; + protected ActiveMQConnectionFactory factory; + protected Connection producerConnection; + + protected Session producerSession; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected Queue queue; + protected int messageCount = 10000; + protected int timeOutInSeconds = 10; + + + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.start(); + + factory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI()); + producerConnection = factory.createConnection(); + producerConnection.start(); + producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); + queue = producerSession.createQueue(getClass().getName()); + producer = producerSession.createProducer(queue); + } + + @After + public void tearDown() throws Exception { + if (producerConnection != null){ + producerConnection.close(); + } + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void testBrokerDestinationView() throws Exception { + for (int i = 0; i < messageCount; i++){ + Message message = producerSession.createTextMessage("test " + i); + producer.send(message); + + } + MessageBrokerView messageBrokerView = MessageBrokerViewRegistry.getInstance().lookup(""); + BrokerDestinationView destinationView = messageBrokerView.getQueueDestinationView(getClass().getName()); + assertEquals(destinationView.getQueueSize(),messageCount); + + } +}