git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@470695 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonas B. Lim 2006-11-03 07:15:37 +00:00
parent 9c3dc347ca
commit 36de58b371
9 changed files with 62 additions and 141 deletions

View File

@ -69,39 +69,43 @@ public class DestinationView implements DestinationViewMBean {
} }
public void resetStatistics() { public void resetStatistics() {
destination.resetStatistics(); destination.getDestinationStatistics().reset();
} }
public long getEnqueueCount() { public long getEnqueueCount() {
return destination.getEnqueueCount(); return destination.getDestinationStatistics().getEnqueues().getCount();
} }
public long getDequeueCount() { public long getDequeueCount() {
return destination.getDequeueCount(); return destination.getDestinationStatistics().getDequeues().getCount();
}
public long getDispatchCount() {
return destination.getDestinationStatistics().getDispatched().getCount();
} }
public long getConsumerCount() { public long getConsumerCount() {
return destination.getConsumerCount(); return destination.getDestinationStatistics().getConsumers().getCount();
} }
public long getQueueSize() { public long getQueueSize() {
return destination.getQueueSize(); return destination.getDestinationStatistics().getMessages().getCount();
} }
public long getMessagesCached() { public long getMessagesCached() {
return destination.getMessagesCached(); return destination.getDestinationStatistics().getMessagesCached().getCount();
} }
public int getMemoryPercentageUsed() { public int getMemoryPercentageUsed() {
return destination.getMemoryPercentageUsed(); return destination.getUsageManager().getPercentUsage();
} }
public long getMemoryLimit() { public long getMemoryLimit() {
return destination.getMemoryLimit(); return destination.getUsageManager().getLimit();
} }
public void setMemoryLimit(long limit) { public void setMemoryLimit(long limit) {
destination.setMemoryLimit(limit); destination.getUsageManager().setLimit(limit);
} }
public CompositeData[] browse() throws OpenDataException{ public CompositeData[] browse() throws OpenDataException{

View File

@ -44,7 +44,12 @@ public interface DestinationViewMBean {
public long getEnqueueCount(); public long getEnqueueCount();
/** /**
* @return The number of messages that have been received from the destination. * @return The number of messages that have been delivered (potentially not acknowledged) to consumers.
*/
public long getDispatchCount();
/**
* @return The number of messages that have been acknowledged from the destination.
*/ */
public long getDequeueCount(); public long getDequeueCount();

View File

@ -51,15 +51,5 @@ public interface Destination extends Service {
DeadLetterStrategy getDeadLetterStrategy(); DeadLetterStrategy getDeadLetterStrategy();
public Message[] browse(); public Message[] browse();
public void resetStatistics();
public String getName(); public String getName();
public long getEnqueueCount();
public long getDequeueCount();
public long getConsumerCount();
public long getQueueSize();
public long getMessagesCached();
public int getMemoryPercentageUsed();
public long getMemoryLimit();
public void setMemoryLimit(long limit);
} }

View File

@ -65,46 +65,18 @@ public class DestinationFilter implements Destination {
return next.getActiveMQDestination(); return next.getActiveMQDestination();
} }
public long getConsumerCount() {
return next.getConsumerCount();
}
public DeadLetterStrategy getDeadLetterStrategy() { public DeadLetterStrategy getDeadLetterStrategy() {
return next.getDeadLetterStrategy(); return next.getDeadLetterStrategy();
} }
public long getDequeueCount() {
return next.getDequeueCount();
}
public DestinationStatistics getDestinationStatistics() { public DestinationStatistics getDestinationStatistics() {
return next.getDestinationStatistics(); return next.getDestinationStatistics();
} }
public long getEnqueueCount() {
return next.getEnqueueCount();
}
public long getMemoryLimit() {
return next.getMemoryLimit();
}
public int getMemoryPercentageUsed() {
return next.getMemoryPercentageUsed();
}
public long getMessagesCached() {
return next.getMessagesCached();
}
public String getName() { public String getName() {
return next.getName(); return next.getName();
} }
public long getQueueSize() {
return next.getQueueSize();
}
public UsageManager getUsageManager() { public UsageManager getUsageManager() {
return next.getUsageManager(); return next.getUsageManager();
} }
@ -117,18 +89,10 @@ public class DestinationFilter implements Destination {
next.removeSubscription(context, sub); next.removeSubscription(context, sub);
} }
public void resetStatistics() {
next.resetStatistics();
}
public void send(ConnectionContext context, Message messageSend) throws Exception { public void send(ConnectionContext context, Message messageSend) throws Exception {
next.send(context, messageSend); next.send(context, messageSend);
} }
public void setMemoryLimit(long limit) {
next.setMemoryLimit(limit);
}
public void start() throws Exception { public void start() throws Exception {
next.start(); next.start();
} }

View File

@ -35,16 +35,19 @@ public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl consumers; protected CountStatisticImpl consumers;
protected CountStatisticImpl messages; protected CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached; protected PollCountStatisticImpl messagesCached;
protected CountStatisticImpl dispatched;
public DestinationStatistics() { public DestinationStatistics() {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination"); enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been dispatched from the destination"); dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination"); consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination");
messages = new CountStatisticImpl("messages", "The number of messages that that are being held by the destination"); messages = new CountStatisticImpl("messages", "The number of messages that that are being held by the destination");
messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache"); messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache");
addStatistic("enqueues", enqueues); addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues); addStatistic("dequeues", dequeues);
addStatistic("consumers", consumers); addStatistic("consumers", consumers);
addStatistic("messages", messages); addStatistic("messages", messages);
@ -75,11 +78,13 @@ public class DestinationStatistics extends StatsImpl {
super.reset(); super.reset();
enqueues.reset(); enqueues.reset();
dequeues.reset(); dequeues.reset();
dispatched.reset();
} }
public void setParent(DestinationStatistics parent) { public void setParent(DestinationStatistics parent) {
if (parent != null) { if (parent != null) {
enqueues.setParent(parent.enqueues); enqueues.setParent(parent.enqueues);
dispatched.setParent(parent.dispatched);
dequeues.setParent(parent.dequeues); dequeues.setParent(parent.dequeues);
consumers.setParent(parent.consumers); consumers.setParent(parent.consumers);
messagesCached.setParent(parent.messagesCached); messagesCached.setParent(parent.messagesCached);
@ -87,6 +92,7 @@ public class DestinationStatistics extends StatsImpl {
} }
else { else {
enqueues.setParent(null); enqueues.setParent(null);
dispatched.setParent(null);
dequeues.setParent(null); dequeues.setParent(null);
consumers.setParent(null); consumers.setParent(null);
messagesCached.setParent(null); messagesCached.setParent(null);
@ -98,15 +104,7 @@ public class DestinationStatistics extends StatsImpl {
this.messagesCached = messagesCached; this.messagesCached = messagesCached;
} }
/** public CountStatisticImpl getDispatched() {
* Called when a message is enqueued to update the statistics. return dispatched;
*/
public void onMessageEnqueue(Message message) {
getEnqueues().increment();
getMessages().increment();
}
public void onMessageDequeue(Message message) {
getDequeues().increment();
} }
} }

View File

@ -170,6 +170,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
// Don't remove the nodes until we are committed. // Don't remove the nodes until we are committed.
if(!context.isInTransaction()){ if(!context.isInTransaction()){
dequeueCounter++; dequeueCounter++;
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
iter.remove(); iter.remove();
}else{ }else{
// setup a Synchronization to remove nodes from the dispatched list. // setup a Synchronization to remove nodes from the dispatched list.
@ -178,6 +179,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
synchronized(PrefetchSubscription.this){ synchronized(PrefetchSubscription.this){
dequeueCounter++; dequeueCounter++;
dispatched.remove(node); dispatched.remove(node);
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
prefetchExtension--; prefetchExtension--;
} }
} }
@ -230,6 +232,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
if(inAckRange){ if(inAckRange){
sendToDLQ(context, node); sendToDLQ(context, node);
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
iter.remove(); iter.remove();
dequeueCounter++; dequeueCounter++;
index++; index++;
@ -406,7 +409,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
synchronized protected void onDispatch(final MessageReference node,final Message message){ synchronized protected void onDispatch(final MessageReference node,final Message message){
if(node.getRegionDestination()!=null){ if(node.getRegionDestination()!=null){
if( node != QueueMessageReference.NULL_MESSAGE ) { if( node != QueueMessageReference.NULL_MESSAGE ) {
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message); node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
context.getConnection().getStatistics().onMessageDequeue(message); context.getConnection().getStatistics().onMessageDequeue(message);
} }
try{ try{

View File

@ -431,45 +431,10 @@ public class Queue implements Destination {
this.messageGroupMapFactory = messageGroupMapFactory; this.messageGroupMapFactory = messageGroupMapFactory;
} }
public void resetStatistics() {
getDestinationStatistics().reset();
}
public String getName() { public String getName() {
return getActiveMQDestination().getPhysicalName(); return getActiveMQDestination().getPhysicalName();
} }
public long getEnqueueCount() {
return getDestinationStatistics().getEnqueues().getCount();
}
public long getDequeueCount() {
return getDestinationStatistics().getDequeues().getCount();
}
public long getConsumerCount() {
return getDestinationStatistics().getConsumers().getCount();
}
public long getQueueSize() {
return getDestinationStatistics().getMessages().getCount();
}
public long getMessagesCached() {
return getDestinationStatistics().getMessagesCached().getCount();
}
public int getMemoryPercentageUsed() {
return getUsageManager().getPercentUsage();
}
public long getMemoryLimit() {
return getUsageManager().getLimit();
}
public void setMemoryLimit(long limit) {
getUsageManager().setLimit(limit);
}
public PendingMessageCursor getMessages(){ public PendingMessageCursor getMessages(){
return this.messages; return this.messages;
} }
@ -488,7 +453,8 @@ public class Queue implements Destination {
dispatchValve.increment(); dispatchValve.increment();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try { try {
destinationStatistics.onMessageEnqueue(message); destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
synchronized (messages) { synchronized (messages) {
messages.addMessageLast(node); messages.addMessageLast(node);
} }

View File

@ -385,46 +385,10 @@ public class Topic implements Destination {
this.deadLetterStrategy = deadLetterStrategy; this.deadLetterStrategy = deadLetterStrategy;
} }
public void resetStatistics(){
getDestinationStatistics().reset();
}
public String getName() { public String getName() {
return getActiveMQDestination().getPhysicalName(); return getActiveMQDestination().getPhysicalName();
} }
public long getEnqueueCount() {
return getDestinationStatistics().getEnqueues().getCount();
}
public long getDequeueCount() {
return getDestinationStatistics().getDequeues().getCount();
}
public long getConsumerCount() {
return getDestinationStatistics().getConsumers().getCount();
}
public long getQueueSize() {
return getDestinationStatistics().getMessages().getCount();
}
public long getMessagesCached() {
return getDestinationStatistics().getMessagesCached().getCount();
}
public int getMemoryPercentageUsed() {
return getUsageManager().getPercentUsage();
}
public long getMemoryLimit() {
return getUsageManager().getLimit();
}
public void setMemoryLimit(long limit) {
getUsageManager().setLimit(limit);
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------

View File

@ -62,6 +62,9 @@ public class TopicSubscription extends AbstractSubscription{
private final AtomicLong enqueueCounter = new AtomicLong(0); private final AtomicLong enqueueCounter = new AtomicLong(0);
private final AtomicLong dequeueCounter = new AtomicLong(0); private final AtomicLong dequeueCounter = new AtomicLong(0);
boolean singleDestination=true;
Destination destination;
public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager) public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
throws InvalidSelectorException{ throws InvalidSelectorException{
super(broker,context,info); super(broker,context,info);
@ -158,12 +161,22 @@ public class TopicSubscription extends AbstractSubscription{
delivered.addAndGet(ack.getMessageCount()); delivered.addAndGet(ack.getMessageCount());
context.getTransaction().addSynchronization(new Synchronization(){ context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{ public void afterCommit() throws Exception{
synchronized( TopicSubscription.this ) {
if( singleDestination ) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
}
dequeueCounter.addAndGet(ack.getMessageCount()); dequeueCounter.addAndGet(ack.getMessageCount());
dispatched.addAndGet(-ack.getMessageCount()); dispatched.addAndGet(-ack.getMessageCount());
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount())); delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
} }
}); });
}else{ }else{
if( singleDestination ) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
dequeueCounter.addAndGet(ack.getMessageCount()); dequeueCounter.addAndGet(ack.getMessageCount());
dispatched.addAndGet(-ack.getMessageCount()); dispatched.addAndGet(-ack.getMessageCount());
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount())); delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
@ -325,15 +338,29 @@ public class TopicSubscription extends AbstractSubscription{
md.setConsumerId(info.getConsumerId()); md.setConsumerId(info.getConsumerId());
md.setDestination(node.getRegionDestination().getActiveMQDestination()); md.setDestination(node.getRegionDestination().getActiveMQDestination());
dispatched.incrementAndGet(); dispatched.incrementAndGet();
// Keep track if this subscription is receiving messages from a single destination.
if( singleDestination ) {
if( destination == null ) {
destination = node.getRegionDestination();
} else {
if( destination != node.getRegionDestination() ) {
singleDestination = false;
}
}
}
if(info.isDispatchAsync()){ if(info.isDispatchAsync()){
md.setConsumer(new Runnable(){ md.setConsumer(new Runnable(){
public void run(){ public void run(){
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
}); });
context.getConnection().dispatchAsync(md); context.getConnection().dispatchAsync(md);
}else{ }else{
context.getConnection().dispatchSync(md); context.getConnection().dispatchSync(md);
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
} }