mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/branches/activemq-4.0@470692 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
98269e05cf
commit
b4933dae28
|
@ -59,39 +59,43 @@ public class DestinationView {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void resetStatistics() {
|
public void resetStatistics() {
|
||||||
destination.resetStatistics();
|
destination.getDestinationStatistics().reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getEnqueueCount() {
|
public long getEnqueueCount() {
|
||||||
return destination.getEnqueueCount();
|
return destination.getDestinationStatistics().getEnqueues().getCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getDequeueCount() {
|
public long getDequeueCount() {
|
||||||
return destination.getDequeueCount();
|
return destination.getDestinationStatistics().getDequeues().getCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDispatchCount() {
|
||||||
|
return destination.getDestinationStatistics().getDispatched().getCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getConsumerCount() {
|
public long getConsumerCount() {
|
||||||
return destination.getConsumerCount();
|
return destination.getDestinationStatistics().getConsumers().getCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getQueueSize() {
|
public long getQueueSize() {
|
||||||
return destination.getQueueSize();
|
return destination.getDestinationStatistics().getMessages().getCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getMessagesCached() {
|
public long getMessagesCached() {
|
||||||
return destination.getMessagesCached();
|
return destination.getDestinationStatistics().getMessagesCached().getCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMemoryPercentageUsed() {
|
public int getMemoryPercentageUsed() {
|
||||||
return destination.getMemoryPercentageUsed();
|
return destination.getUsageManager().getPercentUsage();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getMemoryLimit() {
|
public long getMemoryLimit() {
|
||||||
return destination.getMemoryLimit();
|
return destination.getUsageManager().getLimit();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMemoryLimit(long limit) {
|
public void setMemoryLimit(long limit) {
|
||||||
destination.setMemoryLimit(limit);
|
destination.getUsageManager().setLimit(limit);
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompositeData[] browse() throws OpenDataException{
|
public CompositeData[] browse() throws OpenDataException{
|
||||||
|
|
|
@ -42,7 +42,12 @@ public interface DestinationViewMBean {
|
||||||
public long getEnqueueCount();
|
public long getEnqueueCount();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The number of messages that have been received from the destination.
|
* @return The number of messages that have been delivered (potentially not acknowledged) to consumers.
|
||||||
|
*/
|
||||||
|
public long getDispatchCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The number of messages that have been acknowledged from the destination.
|
||||||
*/
|
*/
|
||||||
public long getDequeueCount();
|
public long getDequeueCount();
|
||||||
|
|
||||||
|
|
|
@ -55,15 +55,5 @@ public interface Destination extends Service {
|
||||||
DeadLetterStrategy getDeadLetterStrategy();
|
DeadLetterStrategy getDeadLetterStrategy();
|
||||||
|
|
||||||
public Message[] browse();
|
public Message[] browse();
|
||||||
|
|
||||||
public void resetStatistics();
|
|
||||||
public String getName();
|
public String getName();
|
||||||
public long getEnqueueCount();
|
|
||||||
public long getDequeueCount();
|
|
||||||
public long getConsumerCount();
|
|
||||||
public long getQueueSize();
|
|
||||||
public long getMessagesCached();
|
|
||||||
public int getMemoryPercentageUsed();
|
|
||||||
public long getMemoryLimit();
|
|
||||||
public void setMemoryLimit(long limit);
|
|
||||||
}
|
}
|
|
@ -35,16 +35,19 @@ public class DestinationStatistics extends StatsImpl {
|
||||||
protected CountStatisticImpl consumers;
|
protected CountStatisticImpl consumers;
|
||||||
protected CountStatisticImpl messages;
|
protected CountStatisticImpl messages;
|
||||||
protected PollCountStatisticImpl messagesCached;
|
protected PollCountStatisticImpl messagesCached;
|
||||||
|
protected CountStatisticImpl dispatched;
|
||||||
|
|
||||||
public DestinationStatistics() {
|
public DestinationStatistics() {
|
||||||
|
|
||||||
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
|
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
|
||||||
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been dispatched from the destination");
|
dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
|
||||||
|
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
|
||||||
consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination");
|
consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination");
|
||||||
messages = new CountStatisticImpl("messages", "The number of messages that that are being held by the destination");
|
messages = new CountStatisticImpl("messages", "The number of messages that that are being held by the destination");
|
||||||
messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache");
|
messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache");
|
||||||
|
|
||||||
addStatistic("enqueues", enqueues);
|
addStatistic("enqueues", enqueues);
|
||||||
|
addStatistic("dispatched", dispatched);
|
||||||
addStatistic("dequeues", dequeues);
|
addStatistic("dequeues", dequeues);
|
||||||
addStatistic("consumers", consumers);
|
addStatistic("consumers", consumers);
|
||||||
addStatistic("messages", messages);
|
addStatistic("messages", messages);
|
||||||
|
@ -75,11 +78,13 @@ public class DestinationStatistics extends StatsImpl {
|
||||||
super.reset();
|
super.reset();
|
||||||
enqueues.reset();
|
enqueues.reset();
|
||||||
dequeues.reset();
|
dequeues.reset();
|
||||||
|
dispatched.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setParent(DestinationStatistics parent) {
|
public void setParent(DestinationStatistics parent) {
|
||||||
if (parent != null) {
|
if (parent != null) {
|
||||||
enqueues.setParent(parent.enqueues);
|
enqueues.setParent(parent.enqueues);
|
||||||
|
dispatched.setParent(parent.dispatched);
|
||||||
dequeues.setParent(parent.dequeues);
|
dequeues.setParent(parent.dequeues);
|
||||||
consumers.setParent(parent.consumers);
|
consumers.setParent(parent.consumers);
|
||||||
messagesCached.setParent(parent.messagesCached);
|
messagesCached.setParent(parent.messagesCached);
|
||||||
|
@ -87,6 +92,7 @@ public class DestinationStatistics extends StatsImpl {
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
enqueues.setParent(null);
|
enqueues.setParent(null);
|
||||||
|
dispatched.setParent(null);
|
||||||
dequeues.setParent(null);
|
dequeues.setParent(null);
|
||||||
consumers.setParent(null);
|
consumers.setParent(null);
|
||||||
messagesCached.setParent(null);
|
messagesCached.setParent(null);
|
||||||
|
@ -98,15 +104,7 @@ public class DestinationStatistics extends StatsImpl {
|
||||||
this.messagesCached = messagesCached;
|
this.messagesCached = messagesCached;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public CountStatisticImpl getDispatched() {
|
||||||
* Called when a message is enqueued to update the statistics.
|
return dispatched;
|
||||||
*/
|
|
||||||
public void onMessageEnqueue(Message message) {
|
|
||||||
getEnqueues().increment();
|
|
||||||
getMessages().increment();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void onMessageDequeue(Message message) {
|
|
||||||
getDequeues().increment();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -108,6 +108,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
|
||||||
// Don't remove the nodes until we are committed.
|
// Don't remove the nodes until we are committed.
|
||||||
if(!context.isInTransaction()){
|
if(!context.isInTransaction()){
|
||||||
dequeueCounter++;
|
dequeueCounter++;
|
||||||
|
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
|
||||||
iter.remove();
|
iter.remove();
|
||||||
}else{
|
}else{
|
||||||
// setup a Synchronization to remove nodes from the dispatched list.
|
// setup a Synchronization to remove nodes from the dispatched list.
|
||||||
|
@ -116,6 +117,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
|
||||||
synchronized(PrefetchSubscription.this){
|
synchronized(PrefetchSubscription.this){
|
||||||
dequeueCounter++;
|
dequeueCounter++;
|
||||||
dispatched.remove(node);
|
dispatched.remove(node);
|
||||||
|
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
|
||||||
prefetchExtension--;
|
prefetchExtension--;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -165,6 +167,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
|
||||||
}
|
}
|
||||||
if(inAckRange){
|
if(inAckRange){
|
||||||
sendToDLQ(context, node);
|
sendToDLQ(context, node);
|
||||||
|
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
|
||||||
iter.remove();
|
iter.remove();
|
||||||
dequeueCounter++;
|
dequeueCounter++;
|
||||||
index++;
|
index++;
|
||||||
|
@ -319,7 +322,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
|
||||||
|
|
||||||
synchronized protected void onDispatch(final MessageReference node,final Message message){
|
synchronized protected void onDispatch(final MessageReference node,final Message message){
|
||||||
if(node.getRegionDestination()!=null){
|
if(node.getRegionDestination()!=null){
|
||||||
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
|
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
|
||||||
context.getConnection().getStatistics().onMessageDequeue(message);
|
context.getConnection().getStatistics().onMessageDequeue(message);
|
||||||
try{
|
try{
|
||||||
dispatchMatched();
|
dispatchMatched();
|
||||||
|
|
|
@ -393,47 +393,10 @@ public class Queue implements Destination {
|
||||||
this.messageGroupHashBucketCount = messageGroupHashBucketCount;
|
this.messageGroupHashBucketCount = messageGroupHashBucketCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void resetStatistics() {
|
|
||||||
getDestinationStatistics().reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return getActiveMQDestination().getPhysicalName();
|
return getActiveMQDestination().getPhysicalName();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getEnqueueCount() {
|
|
||||||
return getDestinationStatistics().getEnqueues().getCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getDequeueCount() {
|
|
||||||
return getDestinationStatistics().getDequeues().getCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getConsumerCount() {
|
|
||||||
return getDestinationStatistics().getConsumers().getCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getQueueSize() {
|
|
||||||
return getDestinationStatistics().getMessages().getCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getMessagesCached() {
|
|
||||||
return getDestinationStatistics().getMessagesCached().getCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getMemoryPercentageUsed() {
|
|
||||||
return getUsageManager().getPercentUsage();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getMemoryLimit() {
|
|
||||||
return getUsageManager().getLimit();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setMemoryLimit(long limit) {
|
|
||||||
getUsageManager().setLimit(limit);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Implementation methods
|
// Implementation methods
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
private MessageReference createMessageReference(Message message) {
|
private MessageReference createMessageReference(Message message) {
|
||||||
|
@ -445,7 +408,8 @@ public class Queue implements Destination {
|
||||||
dispatchValve.increment();
|
dispatchValve.increment();
|
||||||
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
|
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
|
||||||
try {
|
try {
|
||||||
destinationStatistics.onMessageEnqueue(message);
|
destinationStatistics.getEnqueues().increment();
|
||||||
|
destinationStatistics.getMessages().increment();
|
||||||
synchronized (messages) {
|
synchronized (messages) {
|
||||||
messages.add(node);
|
messages.add(node);
|
||||||
}
|
}
|
||||||
|
|
|
@ -380,46 +380,10 @@ public class Topic implements Destination {
|
||||||
this.deadLetterStrategy = deadLetterStrategy;
|
this.deadLetterStrategy = deadLetterStrategy;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void resetStatistics(){
|
|
||||||
getDestinationStatistics().reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return getActiveMQDestination().getPhysicalName();
|
return getActiveMQDestination().getPhysicalName();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getEnqueueCount() {
|
|
||||||
return getDestinationStatistics().getEnqueues().getCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getDequeueCount() {
|
|
||||||
return getDestinationStatistics().getDequeues().getCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getConsumerCount() {
|
|
||||||
return getDestinationStatistics().getConsumers().getCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getQueueSize() {
|
|
||||||
return getDestinationStatistics().getMessages().getCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getMessagesCached() {
|
|
||||||
return getDestinationStatistics().getMessagesCached().getCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getMemoryPercentageUsed() {
|
|
||||||
return getUsageManager().getPercentUsage();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getMemoryLimit() {
|
|
||||||
return getUsageManager().getLimit();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setMemoryLimit(long limit) {
|
|
||||||
getUsageManager().setLimit(limit);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Implementation methods
|
// Implementation methods
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|
|
@ -60,6 +60,9 @@ public class TopicSubscription extends AbstractSubscription{
|
||||||
private final AtomicLong enqueueCounter = new AtomicLong(0);
|
private final AtomicLong enqueueCounter = new AtomicLong(0);
|
||||||
private final AtomicLong dequeueCounter = new AtomicLong(0);
|
private final AtomicLong dequeueCounter = new AtomicLong(0);
|
||||||
|
|
||||||
|
boolean singleDestination=true;
|
||||||
|
Destination destination;
|
||||||
|
|
||||||
public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
|
public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
|
||||||
throws InvalidSelectorException{
|
throws InvalidSelectorException{
|
||||||
super(broker,context,info);
|
super(broker,context,info);
|
||||||
|
@ -146,12 +149,22 @@ public class TopicSubscription extends AbstractSubscription{
|
||||||
delivered.addAndGet(ack.getMessageCount());
|
delivered.addAndGet(ack.getMessageCount());
|
||||||
context.getTransaction().addSynchronization(new Synchronization(){
|
context.getTransaction().addSynchronization(new Synchronization(){
|
||||||
public void afterCommit() throws Exception{
|
public void afterCommit() throws Exception{
|
||||||
|
synchronized( TopicSubscription.this ) {
|
||||||
|
if( singleDestination ) {
|
||||||
|
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
|
||||||
|
}
|
||||||
|
}
|
||||||
dequeueCounter.addAndGet(ack.getMessageCount());
|
dequeueCounter.addAndGet(ack.getMessageCount());
|
||||||
dispatched.addAndGet(-ack.getMessageCount());
|
dispatched.addAndGet(-ack.getMessageCount());
|
||||||
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
|
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}else{
|
}else{
|
||||||
|
|
||||||
|
if( singleDestination ) {
|
||||||
|
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
dequeueCounter.addAndGet(ack.getMessageCount());
|
dequeueCounter.addAndGet(ack.getMessageCount());
|
||||||
dispatched.addAndGet(-ack.getMessageCount());
|
dispatched.addAndGet(-ack.getMessageCount());
|
||||||
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
|
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
|
||||||
|
@ -308,15 +321,29 @@ public class TopicSubscription extends AbstractSubscription{
|
||||||
md.setConsumerId(info.getConsumerId());
|
md.setConsumerId(info.getConsumerId());
|
||||||
md.setDestination(node.getRegionDestination().getActiveMQDestination());
|
md.setDestination(node.getRegionDestination().getActiveMQDestination());
|
||||||
dispatched.incrementAndGet();
|
dispatched.incrementAndGet();
|
||||||
|
|
||||||
|
// Keep track if this subscription is receiving messages from a single destination.
|
||||||
|
if( singleDestination ) {
|
||||||
|
if( destination == null ) {
|
||||||
|
destination = node.getRegionDestination();
|
||||||
|
} else {
|
||||||
|
if( destination != node.getRegionDestination() ) {
|
||||||
|
singleDestination = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if(info.isDispatchAsync()){
|
if(info.isDispatchAsync()){
|
||||||
md.setConsumer(new Runnable(){
|
md.setConsumer(new Runnable(){
|
||||||
public void run(){
|
public void run(){
|
||||||
|
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
|
||||||
node.decrementReferenceCount();
|
node.decrementReferenceCount();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
context.getConnection().dispatchAsync(md);
|
context.getConnection().dispatchAsync(md);
|
||||||
}else{
|
}else{
|
||||||
context.getConnection().dispatchSync(md);
|
context.getConnection().dispatchSync(md);
|
||||||
|
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
|
||||||
node.decrementReferenceCount();
|
node.decrementReferenceCount();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue