git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1518261 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2013-08-28 15:19:00 +00:00
parent 0d98623b7b
commit f71e2fe21a
9 changed files with 102 additions and 9 deletions

View File

@ -145,6 +145,28 @@ public class BrokerView implements BrokerViewMBean {
return safeGetBroker().getDestinationStatistics().getMessages().getCount(); return safeGetBroker().getDestinationStatistics().getMessages().getCount();
} }
/**
* @return the average size of a message (bytes)
*/
public double getAverageMessageSize() {
return safeGetBroker().getDestinationStatistics().getMessageSize().getAverageSize();
}
/**
* @return the max size of a message (bytes)
*/
public long getMaxMessageSize() {
return safeGetBroker().getDestinationStatistics().getMessageSize().getMaxSize();
}
/**
* @return the min size of a message (bytes)
*/
public long getMinMessageSize() {
return safeGetBroker().getDestinationStatistics().getMessageSize().getMinSize();
}
public long getTotalMessagesCached() { public long getTotalMessagesCached() {
return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount(); return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
} }

View File

@ -16,11 +16,11 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import java.util.Map;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import java.util.Map;
/** /**
* @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (for the reloadLog4jProperties method) * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (for the reloadLog4jProperties method)
@ -88,6 +88,16 @@ public interface BrokerViewMBean extends Service {
@MBeanInfo("Number of unacknowledged messages on the broker.") @MBeanInfo("Number of unacknowledged messages on the broker.")
long getTotalMessageCount(); long getTotalMessageCount();
@MBeanInfo("Average message size on this broker")
double getAverageMessageSize();
@MBeanInfo("Max message size on this broker")
public long getMaxMessageSize();
@MBeanInfo("Min message size on this broker")
public long getMinMessageSize();
@MBeanInfo("Percent of memory limit used.") @MBeanInfo("Percent of memory limit used.")
int getMemoryPercentUsage(); int getMemoryPercentUsage();

View File

@ -38,7 +38,6 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType; import javax.management.openmbean.TabularType;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
@ -52,8 +51,6 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -156,6 +153,28 @@ public class DestinationView implements DestinationViewMBean {
return destination.getDestinationStatistics().getProcessTime().getMinTime(); return destination.getDestinationStatistics().getProcessTime().getMinTime();
} }
/**
* @return the average size of a message (bytes)
*/
public double getAverageMessageSize() {
return destination.getDestinationStatistics().getMessageSize().getAverageSize();
}
/**
* @return the max size of a message (bytes)
*/
public long getMaxMessageSize() {
return destination.getDestinationStatistics().getMessageSize().getMaxSize();
}
/**
* @return the min size of a message (bytes)
*/
public long getMinMessageSize() {
return destination.getDestinationStatistics().getMessageSize().getMinSize();
}
@Override @Override
public boolean isPrioritizedMessages() { public boolean isPrioritizedMessages() {
return destination.isPrioritizedMessages(); return destination.isPrioritizedMessages();

View File

@ -254,12 +254,19 @@ public interface DestinationViewMBean {
@MBeanInfo("The shortest time a message has been held this destination.") @MBeanInfo("The shortest time a message has been held this destination.")
long getMinEnqueueTime(); long getMinEnqueueTime();
/**
* @return average time a message is held by a destination
*/
@MBeanInfo("Average time a message has been held this destination.") @MBeanInfo("Average time a message has been held this destination.")
double getAverageEnqueueTime(); double getAverageEnqueueTime();
@MBeanInfo("Average message size on this destination")
double getAverageMessageSize();
@MBeanInfo("Max message size on this destination")
public long getMaxMessageSize();
@MBeanInfo("Min message size on this destination")
public long getMinMessageSize();
/** /**
* @return the producerFlowControl * @return the producerFlowControl
*/ */

View File

@ -20,7 +20,7 @@ package org.apache.activemq.broker.region;
import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.PollCountStatisticImpl; import org.apache.activemq.management.PollCountStatisticImpl;
import org.apache.activemq.management.StatsImpl; import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.management.TimeStatisticImpl; import org.apache.activemq.management.*;
/** /**
* The J2EE Statistics for the a Destination. * The J2EE Statistics for the a Destination.
@ -41,6 +41,7 @@ public class DestinationStatistics extends StatsImpl {
protected TimeStatisticImpl processTime; protected TimeStatisticImpl processTime;
protected CountStatisticImpl blockedSends; protected CountStatisticImpl blockedSends;
protected TimeStatisticImpl blockedTime; protected TimeStatisticImpl blockedTime;
protected SizeStatisticImpl messageSize;
public DestinationStatistics() { public DestinationStatistics() {
@ -61,6 +62,7 @@ public class DestinationStatistics extends StatsImpl {
processTime = new TimeStatisticImpl("processTime", "information around length of time messages are held by a destination"); processTime = new TimeStatisticImpl("processTime", "information around length of time messages are held by a destination");
blockedSends = new CountStatisticImpl("blockedSends", "number of messages that have to wait for flow control"); blockedSends = new CountStatisticImpl("blockedSends", "number of messages that have to wait for flow control");
blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control"); blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination");
addStatistic("enqueues", enqueues); addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched); addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues); addStatistic("dequeues", dequeues);
@ -73,6 +75,7 @@ public class DestinationStatistics extends StatsImpl {
addStatistic("processTime", processTime); addStatistic("processTime", processTime);
addStatistic("blockedSends",blockedSends); addStatistic("blockedSends",blockedSends);
addStatistic("blockedTime",blockedTime); addStatistic("blockedTime",blockedTime);
addStatistic("messageSize",messageSize);
} }
public CountStatisticImpl getEnqueues() { public CountStatisticImpl getEnqueues() {
@ -125,6 +128,9 @@ public class DestinationStatistics extends StatsImpl {
public TimeStatisticImpl getBlockedTime(){ public TimeStatisticImpl getBlockedTime(){
return this.blockedTime; return this.blockedTime;
} }
public SizeStatisticImpl getMessageSize(){
return this.messageSize;
}
public void reset() { public void reset() {
if (this.isDoReset()) { if (this.isDoReset()) {
@ -136,6 +142,7 @@ public class DestinationStatistics extends StatsImpl {
expired.reset(); expired.reset();
blockedSends.reset(); blockedSends.reset();
blockedTime.reset(); blockedTime.reset();
messageSize.reset();
} }
} }
@ -153,6 +160,7 @@ public class DestinationStatistics extends StatsImpl {
processTime.setEnabled(enabled); processTime.setEnabled(enabled);
blockedSends.setEnabled(enabled); blockedSends.setEnabled(enabled);
blockedTime.setEnabled(enabled); blockedTime.setEnabled(enabled);
messageSize.setEnabled(enabled);
} }
@ -170,6 +178,7 @@ public class DestinationStatistics extends StatsImpl {
processTime.setParent(parent.processTime); processTime.setParent(parent.processTime);
blockedSends.setParent(parent.blockedSends); blockedSends.setParent(parent.blockedSends);
blockedTime.setParent(parent.blockedTime); blockedTime.setParent(parent.blockedTime);
messageSize.setParent(parent.messageSize);
} else { } else {
enqueues.setParent(null); enqueues.setParent(null);
dispatched.setParent(null); dispatched.setParent(null);
@ -183,6 +192,7 @@ public class DestinationStatistics extends StatsImpl {
processTime.setParent(null); processTime.setParent(null);
blockedSends.setParent(null); blockedSends.setParent(null);
blockedTime.setParent(null); blockedTime.setParent(null);
messageSize.setParent(null);
} }
} }

View File

@ -1844,6 +1844,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
final void messageSent(final ConnectionContext context, final Message msg) throws Exception { final void messageSent(final ConnectionContext context, final Message msg) throws Exception {
destinationStatistics.getEnqueues().increment(); destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment(); destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize());
messageDelivered(context, msg); messageDelivered(context, msg);
consumersLock.readLock().lock(); consumersLock.readLock().lock();
try { try {

View File

@ -691,6 +691,7 @@ public class Topic extends BaseDestination implements Task {
// misleading metrics. // misleading metrics.
// destinationStatistics.getMessages().increment(); // destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment(); destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessageSize().addSize(message.getSize());
MessageEvaluationContext msgContext = null; MessageEvaluationContext msgContext = null;
dispatchLock.readLock().lock(); dispatchLock.readLock().lock();

View File

@ -143,6 +143,27 @@ public class BrokerDestinationView {
return destination.getDestinationStatistics().getProcessTime().getMinTime(); return destination.getDestinationStatistics().getProcessTime().getMinTime();
} }
/**
* @return the average size of a message (bytes)
*/
public double getAverageMessageSize() {
return destination.getDestinationStatistics().getMessageSize().getAverageSize();
}
/**
* @return the max size of a message (bytes)
*/
public long getMaxMessageSize() {
return destination.getDestinationStatistics().getMessageSize().getMaxSize();
}
/**
* @return the min size of a message (bytes)
*/
public long getMinMessageSize() {
return destination.getDestinationStatistics().getMessageSize().getMinSize();
}
/** /**
* @return true if the destination is a Dead Letter Queue * @return true if the destination is a Dead Letter Queue

View File

@ -112,6 +112,7 @@ public class StatisticsBroker extends BrokerFilter {
statsMessage.setLong("expiredCount", stats.getExpired().getCount()); statsMessage.setLong("expiredCount", stats.getExpired().getCount());
statsMessage.setLong("inflightCount", stats.getInflight().getCount()); statsMessage.setLong("inflightCount", stats.getInflight().getCount());
statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount()); statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
statsMessage.setDouble("averageMessageSize", stats.getMessageSize().getAveragePerSecond());
statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage()); statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage()); statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit()); statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
@ -150,6 +151,7 @@ public class StatisticsBroker extends BrokerFilter {
statsMessage.setLong("dispatchCount", stats.getDispatched().getCount()); statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
statsMessage.setLong("expiredCount", stats.getExpired().getCount()); statsMessage.setLong("expiredCount", stats.getExpired().getCount());
statsMessage.setLong("inflightCount", stats.getInflight().getCount()); statsMessage.setLong("inflightCount", stats.getInflight().getCount());
statsMessage.setDouble("averageMessageSize",stats.getMessageSize().getAverageSize());
statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount()); statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage()); statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage()); statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());