From f71e2fe21a93552bf762f63fc0dc6605499143f8 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 28 Aug 2013 15:19:00 +0000 Subject: [PATCH] Fix for https://issues.apache.org/jira/browse/AMQ-4697 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1518261 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/jmx/BrokerView.java | 22 ++++++++++++++++ .../activemq/broker/jmx/BrokerViewMBean.java | 14 +++++++++-- .../activemq/broker/jmx/DestinationView.java | 25 ++++++++++++++++--- .../broker/jmx/DestinationViewMBean.java | 13 +++++++--- .../broker/region/DestinationStatistics.java | 12 ++++++++- .../apache/activemq/broker/region/Queue.java | 1 + .../apache/activemq/broker/region/Topic.java | 1 + .../broker/view/BrokerDestinationView.java | 21 ++++++++++++++++ .../activemq/plugin/StatisticsBroker.java | 2 ++ 9 files changed, 102 insertions(+), 9 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index b6880006d9..cb96a4276e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -145,6 +145,28 @@ public class BrokerView implements BrokerViewMBean { return safeGetBroker().getDestinationStatistics().getMessages().getCount(); } + /** + * @return the average size of a message (bytes) + */ + public double getAverageMessageSize() { + return safeGetBroker().getDestinationStatistics().getMessageSize().getAverageSize(); + } + + /** + * @return the max size of a message (bytes) + */ + public long getMaxMessageSize() { + return safeGetBroker().getDestinationStatistics().getMessageSize().getMaxSize(); + } + + /** + * @return the min size of a message (bytes) + */ + public long getMinMessageSize() { + return safeGetBroker().getDestinationStatistics().getMessageSize().getMinSize(); + } + + public long getTotalMessagesCached() { return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java index 2c1af7d3fd..c787c5abe9 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java @@ -16,11 +16,11 @@ */ package org.apache.activemq.broker.jmx; +import java.util.Map; + import javax.management.ObjectName; import org.apache.activemq.Service; -import java.util.Map; - /** * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (for the reloadLog4jProperties method) @@ -88,6 +88,16 @@ public interface BrokerViewMBean extends Service { @MBeanInfo("Number of unacknowledged messages on the broker.") long getTotalMessageCount(); + + @MBeanInfo("Average message size on this broker") + double getAverageMessageSize(); + + @MBeanInfo("Max message size on this broker") + public long getMaxMessageSize(); + + @MBeanInfo("Min message size on this broker") + public long getMinMessageSize(); + @MBeanInfo("Percent of memory limit used.") int getMemoryPercentUsage(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 03e198fb1f..3f62943335 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -38,7 +38,6 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.region.Destination; @@ -52,8 +51,6 @@ import org.apache.activemq.command.Message; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.selector.SelectorParser; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,6 +153,28 @@ public class DestinationView implements DestinationViewMBean { return destination.getDestinationStatistics().getProcessTime().getMinTime(); } + /** + * @return the average size of a message (bytes) + */ + public double getAverageMessageSize() { + return destination.getDestinationStatistics().getMessageSize().getAverageSize(); + } + + /** + * @return the max size of a message (bytes) + */ + public long getMaxMessageSize() { + return destination.getDestinationStatistics().getMessageSize().getMaxSize(); + } + + /** + * @return the min size of a message (bytes) + */ + public long getMinMessageSize() { + return destination.getDestinationStatistics().getMessageSize().getMinSize(); + } + + @Override public boolean isPrioritizedMessages() { return destination.isPrioritizedMessages(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index c9e7116bce..f83d47ebc2 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -254,12 +254,19 @@ public interface DestinationViewMBean { @MBeanInfo("The shortest time a message has been held this destination.") long getMinEnqueueTime(); - /** - * @return average time a message is held by a destination - */ + @MBeanInfo("Average time a message has been held this destination.") double getAverageEnqueueTime(); + @MBeanInfo("Average message size on this destination") + double getAverageMessageSize(); + + @MBeanInfo("Max message size on this destination") + public long getMaxMessageSize(); + + @MBeanInfo("Min message size on this destination") + public long getMinMessageSize(); + /** * @return the producerFlowControl */ diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java index 572be4ffdf..ee2b478743 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java @@ -20,7 +20,7 @@ package org.apache.activemq.broker.region; import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.management.PollCountStatisticImpl; import org.apache.activemq.management.StatsImpl; -import org.apache.activemq.management.TimeStatisticImpl; +import org.apache.activemq.management.*; /** * The J2EE Statistics for the a Destination. @@ -41,6 +41,7 @@ public class DestinationStatistics extends StatsImpl { protected TimeStatisticImpl processTime; protected CountStatisticImpl blockedSends; protected TimeStatisticImpl blockedTime; + protected SizeStatisticImpl messageSize; public DestinationStatistics() { @@ -61,6 +62,7 @@ public class DestinationStatistics extends StatsImpl { processTime = new TimeStatisticImpl("processTime", "information around length of time messages are held by a destination"); blockedSends = new CountStatisticImpl("blockedSends", "number of messages that have to wait for flow control"); blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control"); + messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination"); addStatistic("enqueues", enqueues); addStatistic("dispatched", dispatched); addStatistic("dequeues", dequeues); @@ -73,6 +75,7 @@ public class DestinationStatistics extends StatsImpl { addStatistic("processTime", processTime); addStatistic("blockedSends",blockedSends); addStatistic("blockedTime",blockedTime); + addStatistic("messageSize",messageSize); } public CountStatisticImpl getEnqueues() { @@ -125,6 +128,9 @@ public class DestinationStatistics extends StatsImpl { public TimeStatisticImpl getBlockedTime(){ return this.blockedTime; } + public SizeStatisticImpl getMessageSize(){ + return this.messageSize; + } public void reset() { if (this.isDoReset()) { @@ -136,6 +142,7 @@ public class DestinationStatistics extends StatsImpl { expired.reset(); blockedSends.reset(); blockedTime.reset(); + messageSize.reset(); } } @@ -153,6 +160,7 @@ public class DestinationStatistics extends StatsImpl { processTime.setEnabled(enabled); blockedSends.setEnabled(enabled); blockedTime.setEnabled(enabled); + messageSize.setEnabled(enabled); } @@ -170,6 +178,7 @@ public class DestinationStatistics extends StatsImpl { processTime.setParent(parent.processTime); blockedSends.setParent(parent.blockedSends); blockedTime.setParent(parent.blockedTime); + messageSize.setParent(parent.messageSize); } else { enqueues.setParent(null); dispatched.setParent(null); @@ -183,6 +192,7 @@ public class DestinationStatistics extends StatsImpl { processTime.setParent(null); blockedSends.setParent(null); blockedTime.setParent(null); + messageSize.setParent(null); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 818e02e7fa..74d7686c56 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1844,6 +1844,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { final void messageSent(final ConnectionContext context, final Message msg) throws Exception { destinationStatistics.getEnqueues().increment(); destinationStatistics.getMessages().increment(); + destinationStatistics.getMessageSize().addSize(msg.getSize()); messageDelivered(context, msg); consumersLock.readLock().lock(); try { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index c1e4133592..ebb6dfa3a6 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -691,6 +691,7 @@ public class Topic extends BaseDestination implements Task { // misleading metrics. // destinationStatistics.getMessages().increment(); destinationStatistics.getEnqueues().increment(); + destinationStatistics.getMessageSize().addSize(message.getSize()); MessageEvaluationContext msgContext = null; dispatchLock.readLock().lock(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java index 16a3cb07e0..a80e64e741 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java @@ -143,6 +143,27 @@ public class BrokerDestinationView { return destination.getDestinationStatistics().getProcessTime().getMinTime(); } + /** + * @return the average size of a message (bytes) + */ + public double getAverageMessageSize() { + return destination.getDestinationStatistics().getMessageSize().getAverageSize(); + } + + /** + * @return the max size of a message (bytes) + */ + public long getMaxMessageSize() { + return destination.getDestinationStatistics().getMessageSize().getMaxSize(); + } + + /** + * @return the min size of a message (bytes) + */ + public long getMinMessageSize() { + return destination.getDestinationStatistics().getMessageSize().getMinSize(); + } + /** * @return true if the destination is a Dead Letter Queue diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java index 5df6996859..9679274c29 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java @@ -112,6 +112,7 @@ public class StatisticsBroker extends BrokerFilter { statsMessage.setLong("expiredCount", stats.getExpired().getCount()); statsMessage.setLong("inflightCount", stats.getInflight().getCount()); statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount()); + statsMessage.setDouble("averageMessageSize", stats.getMessageSize().getAveragePerSecond()); statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage()); statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage()); statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit()); @@ -150,6 +151,7 @@ public class StatisticsBroker extends BrokerFilter { statsMessage.setLong("dispatchCount", stats.getDispatched().getCount()); statsMessage.setLong("expiredCount", stats.getExpired().getCount()); statsMessage.setLong("inflightCount", stats.getInflight().getCount()); + statsMessage.setDouble("averageMessageSize",stats.getMessageSize().getAverageSize()); statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount()); statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage()); statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());