git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@661224 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-05-29 06:41:35 +00:00
parent b3ee59d7a1
commit 4dde3239ff
8 changed files with 220 additions and 11 deletions

View File

@ -37,6 +37,7 @@ import javax.management.openmbean.TabularType;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
@ -317,5 +318,21 @@ public class DestinationView implements DestinationViewMBean {
public void setProducerFlowControl(boolean producerFlowControl) {
destination.setProducerFlowControl(producerFlowControl);
}
public int getMaxPageSize() {
return destination.getMaxPageSize();
}
public void setMaxPageSize(int pageSize) {
destination.setMaxPageSize(pageSize);
}
public boolean isUseCache() {
return destination.isUseCache();
}
public void setUseCache(boolean value) {
destination.setUseCache(value);
}
}

View File

@ -214,5 +214,28 @@ public interface DestinationViewMBean {
* @param maxAuditDepth the maxAuditDepth to set
*/
public void setMaxAuditDepth(int maxAuditDepth);
/**
* @return the maximum number of message to be paged into the
* destination
*/
public int getMaxPageSize();
/**
* @param pageSize
* Set the maximum number of messages to page into the destination
*/
public void setMaxPageSize(int pageSize);
/**
* @return true if caching is enabled of for the destination
*/
public boolean isUseCache();
/**
* @param value
* enable/disable caching on the destination
*/
public void setUseCache(boolean value);
}

View File

@ -21,6 +21,7 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.RemoveSubscriptionInfo;
@ -31,6 +32,7 @@ public class DurableSubscriptionView extends SubscriptionView implements Durable
protected ManagedRegionBroker broker;
protected String subscriptionName;
protected DurableTopicSubscription durableSub;
/**
* Constructor
@ -41,6 +43,7 @@ public class DurableSubscriptionView extends SubscriptionView implements Durable
public DurableSubscriptionView(ManagedRegionBroker broker, String clientId, Subscription sub) {
super(clientId, sub);
this.broker = broker;
this.durableSub=(DurableTopicSubscription) sub;
this.subscriptionName = sub.getConsumerInfo().getSubscriptionName();
}
@ -86,6 +89,55 @@ public class DurableSubscriptionView extends SubscriptionView implements Durable
}
public String toString() {
return "InactiveDurableSubscriptionView: " + getClientId() + ":" + getSubscriptionName();
return "ActiveDurableSubscriptionView: " + getClientId() + ":" + getSubscriptionName();
}
public int cursorSize() {
if (durableSub != null && durableSub.getPending() != null) {
return durableSub.getPending().size();
}
return 0;
}
public boolean doesCursorHaveMessagesBuffered() {
if (durableSub != null && durableSub.getPending() != null) {
return durableSub.getPending().hasMessagesBufferedToDeliver();
}
return false;
}
public boolean doesCursorHaveSpace() {
if (durableSub != null && durableSub.getPending() != null) {
return durableSub.getPending().hasSpace();
}
return false;
}
/* (non-Javadoc)
* @see org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean#getCursorMemoryUsage()
*/
public long getCursorMemoryUsage() {
if (durableSub != null && durableSub.getPending() != null && durableSub.getPending().getSystemUsage()!=null) {
return durableSub.getPending().getSystemUsage().getMemoryUsage().getUsage();
}
return 0;
}
public int getCursorPercentUsage() {
if (durableSub != null && durableSub.getPending() != null && durableSub.getPending().getSystemUsage()!=null) {
return durableSub.getPending().getSystemUsage().getMemoryUsage().getPercentUsage();
}
return 0;
}
public boolean isCursorFull() {
if (durableSub != null && durableSub.getPending() != null) {
return durableSub.getPending().isFull();
}
return false;
}
}

View File

@ -50,4 +50,37 @@ public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean {
* stored for this subscription
*/
void destroy() throws Exception;
/**
* @return true if the message cursor has memory space available
* to page in more messages
*/
public boolean doesCursorHaveSpace();
/**
* @return true if the cursor has reached its memory limit for
* paged in messages
*/
public boolean isCursorFull();
/**
* @return true if the cursor has messages buffered to deliver
*/
public boolean doesCursorHaveMessagesBuffered();
/**
* @return the cursor memory usage in bytes
*/
public long getCursorMemoryUsage();
/**
* @return the cursor memory usage as a percentage
*/
public int getCursorPercentUsage();
/**
* @return the number of messages available to be paged in
* by the cursor
*/
public int cursorSize();
}

View File

@ -25,12 +25,10 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SubscriptionInfo;
/**
* TODO why does this class not inherit from DurableSubscriptionView?
*
* @version $Revision: 1.5 $
*/
public class InactiveDurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
protected ManagedRegionBroker broker;
public class InactiveDurableSubscriptionView extends DurableSubscriptionView implements DurableSubscriptionViewMBean {
protected SubscriptionInfo subscriptionInfo;
/**
@ -41,7 +39,7 @@ public class InactiveDurableSubscriptionView extends SubscriptionView implements
* @param sub
*/
public InactiveDurableSubscriptionView(ManagedRegionBroker broker, String clientId, SubscriptionInfo sub) {
super(clientId, null);
super(broker,clientId, null);
this.broker = broker;
this.subscriptionInfo = sub;
}

View File

@ -92,4 +92,56 @@ public class QueueView extends DestinationView implements QueueViewMBean {
return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages);
}
public int cursorSize() {
Queue queue = (Queue) destination;
if (queue.getMessages() != null){
return queue.getMessages().size();
}
return 0;
}
public boolean doesCursorHaveMessagesBuffered() {
Queue queue = (Queue) destination;
if (queue.getMessages() != null){
return queue.getMessages().hasMessagesBufferedToDeliver();
}
return false;
}
public boolean doesCursorHaveSpace() {
Queue queue = (Queue) destination;
if (queue.getMessages() != null){
return queue.getMessages().hasSpace();
}
return false;
}
public long getCursorMemoryUsage() {
Queue queue = (Queue) destination;
if (queue.getMessages() != null && queue.getMessages().getSystemUsage() != null){
return queue.getMessages().getSystemUsage().getMemoryUsage().getUsage();
}
return 0;
}
public int getCursorPercentUsage() {
Queue queue = (Queue) destination;
if (queue.getMessages() != null && queue.getMessages().getSystemUsage() != null){
return queue.getMessages().getSystemUsage().getMemoryUsage().getPercentUsage();
}
return 0;
}
public boolean isCursorFull() {
Queue queue = (Queue) destination;
if (queue.getMessages() != null){
return queue.getMessages().isFull();
}
return false;
}
}

View File

@ -112,5 +112,38 @@ public interface QueueViewMBean extends DestinationViewMBean {
* of matched messages
*/
int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception;
/**
* @return true if the message cursor has memory space available
* to page in more messages
*/
public boolean doesCursorHaveSpace();
/**
* @return true if the cursor has reached its memory limit for
* paged in messages
*/
public boolean isCursorFull();
/**
* @return true if the cursor has messages buffered to deliver
*/
public boolean doesCursorHaveMessagesBuffered();
/**
* @return the cursor memory usage in bytes
*/
public long getCursorMemoryUsage();
/**
* @return the cursor memory usage as a percentage
*/
public int getCursorPercentUsage();
/**
* @return the number of messages available to be paged in
* by the cursor
*/
public int cursorSize();
}

View File

@ -19,14 +19,11 @@ package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;
/**
@ -194,6 +191,11 @@ public interface PendingMessageCursor extends Service {
* @return true if the cursor is full
*/
boolean isFull();
/**
* @return true if the cursor has space to page messages into
*/
public boolean hasSpace();
/**
* @return true if the cursor has buffered messages ready to deliver
@ -280,6 +282,5 @@ public interface PendingMessageCursor extends Service {
* @return true if a cache is being used
*/
public boolean isUseCache();
}