Added mbeans for Subscribers

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@381927 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-03-01 06:34:44 +00:00
parent 33b73ac71d
commit 3601e813f3
13 changed files with 633 additions and 77 deletions

View File

@ -866,7 +866,7 @@ public class BrokerService implements Service {
if (isUseJmx()) {
ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
BrokerViewMBean view = new BrokerView(broker, managedBroker.getDestinationStatistics(), getMemoryManager());
BrokerViewMBean view = new BrokerView(managedBroker, getMemoryManager());
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
ObjectName objectName = getBrokerObjectName();
mbeanServer.registerMBean(view, objectName);

View File

@ -16,19 +16,18 @@
*/
package org.apache.activemq.broker.jmx;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.memory.UsageManager;
public class BrokerView implements BrokerViewMBean {
private final Broker broker;
private final DestinationStatistics destinationStatistics;
private final ManagedRegionBroker broker;
private final UsageManager usageManager;
public BrokerView(Broker broker, DestinationStatistics destinationStatistics, UsageManager usageManager) {
public BrokerView(ManagedRegionBroker broker, UsageManager usageManager) {
this.broker = broker;
this.destinationStatistics = destinationStatistics;
this.usageManager = usageManager;
}
@ -49,19 +48,19 @@ public class BrokerView implements BrokerViewMBean {
}
public long getTotalEnqueueCount() {
return destinationStatistics.getEnqueues().getCount();
return broker.getDestinationStatistics().getEnqueues().getCount();
}
public long getTotalDequeueCount() {
return destinationStatistics.getDequeues().getCount();
return broker.getDestinationStatistics().getDequeues().getCount();
}
public long getTotalConsumerCount() {
return destinationStatistics.getConsumers().getCount();
return broker.getDestinationStatistics().getConsumers().getCount();
}
public long getTotalMessages() {
return destinationStatistics.getMessages().getCount();
return broker.getDestinationStatistics().getMessages().getCount();
}
public long getTotalMessagesCached() {
return destinationStatistics.getMessagesCached().getCount();
return broker.getDestinationStatistics().getMessagesCached().getCount();
}
public int getMemoryPercentageUsed() {
@ -75,11 +74,47 @@ public class BrokerView implements BrokerViewMBean {
}
public void resetStatistics() {
destinationStatistics.reset();
broker.getDestinationStatistics().reset();
}
public void terminateJVM(int exitCode) {
System.exit(exitCode);
}
public ObjectName[] getTopics(){
return broker.getTopics();
}
public ObjectName[] getQueues(){
return broker.getQueues();
}
public ObjectName[] getTemporaryTopics(){
return broker.getTemporaryTopics();
}
public ObjectName[] getTemporaryQueues(){
return broker.getTemporaryQueues();
}
public ObjectName[] getTopicSubscribers(){
return broker.getTemporaryTopicSubscribers();
}
public ObjectName[] getDurableTopicSubscribers(){
return broker.getDurableTopicSubscribers();
}
public ObjectName[] getQueueSubscribers(){
return broker.getQueueSubscribers();
}
public ObjectName[] getTemporaryTopicSubscribers(){
return broker.getTemporaryTopicSubscribers();
}
public ObjectName[] getTemporaryQueueSubscribers(){
return broker.getTemporaryQueueSubscribers();
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.jmx;
import javax.management.ObjectName;
import org.apache.activemq.Service;
public interface BrokerViewMBean extends Service {
@ -37,4 +38,16 @@ public interface BrokerViewMBean extends Service {
public void terminateJVM(int exitCode);
public ObjectName[] getTopics();
public ObjectName[] getQueues();
public ObjectName[] getTemporaryTopics();
public ObjectName[] getTemporaryQueues();
public ObjectName[] getTopicSubscribers();
public ObjectName[] getDurableTopicSubscribers();
public ObjectName[] getQueueSubscribers();
public ObjectName[] getTemporaryTopicSubscribers();
public ObjectName[] getTemporaryQueueSubscribers();
}

View File

@ -0,0 +1,56 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import org.apache.activemq.broker.region.Subscription;
/**
* @version $Revision: 1.5 $
*/
public class DurableSubscriptionView extends SubscriptionView implements DurableSubscriptionViewMBean {
protected String subscriptionName;
public DurableSubscriptionView(Subscription sub){
super(sub);
this.subscriptionName = sub.getConsumerInfo().getSubcriptionName();
}
/**
* @return name of the durable consumer
*/
public String getSubscriptionName(){
return subscriptionName;
}
/**
* Browse messages for this durable subscriber
*
* @return messages
* @throws OpenDataException
*/
public CompositeData[] browse() throws OpenDataException{
return null;
}
/**
* Browse messages for this durable subscriber
*
* @return messages
* @throws OpenDataException
*/
public TabularData browseAsTable() throws OpenDataException{
return null;
}
}

View File

@ -0,0 +1,43 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
/**
* @version $Revision: 1.5 $
*/
public interface DurableSubscriptionViewMBean extends SubscriptionViewMBean{
/**
* @return name of the durable consumer
*/
public String getSubscriptionName();
/**
* Browse messages for this durable subscriber
*
* @return messages
* @throws OpenDataException
*/
public CompositeData[] browse() throws OpenDataException;
/**
* Browse messages for this durable subscriber
*
* @return messages
* @throws OpenDataException
*/
public TabularData browseAsTable() throws OpenDataException;
}

View File

@ -1,21 +1,24 @@
/**
*
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
@ -25,78 +28,207 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.JMXSupport;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.IOException;
import java.util.Hashtable;
public class ManagedRegionBroker extends RegionBroker {
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
public class ManagedRegionBroker extends RegionBroker{
private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer;
private final ObjectName brokerObjectName;
private final Map topics=new ConcurrentHashMap();
private final Map queues=new ConcurrentHashMap();
private final Map temporaryQueues=new ConcurrentHashMap();
private final Map temporaryTopics=new ConcurrentHashMap();
private final Map queueSubscribers=new ConcurrentHashMap();
private final Map topicSubscribers=new ConcurrentHashMap();
private final Map durableTopicSubscribers=new ConcurrentHashMap();
private final Map temporaryQueueSubscribers=new ConcurrentHashMap();
private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, PolicyMap policyMap) throws IOException {
super(brokerService,taskRunnerFactory, memoryManager, adapter, policyMap);
this.mbeanServer = mbeanServer;
this.brokerObjectName = brokerObjectName;
public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName,
TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter,
PolicyMap policyMap) throws IOException{
super(brokerService,taskRunnerFactory,memoryManager,adapter,policyMap);
this.mbeanServer=mbeanServer;
this.brokerObjectName=brokerObjectName;
}
protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter, PolicyMap policyMap) {
return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap);
}
protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory);
}
protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory);
}
protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter, PolicyMap policyMap) {
return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap);
protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter adapter,PolicyMap policyMap){
return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter,policyMap);
}
public void register(ActiveMQDestination destName, Destination destination) throws Throwable {
protected Region createTempQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory){
return new ManagedTempQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory);
}
protected Region createTempTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory){
return new ManagedTempTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory);
}
protected Region createTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter adapter,PolicyMap policyMap){
return new ManagedTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter,policyMap);
}
public void register(ActiveMQDestination destName,Destination destination){
// Build the object name for the destination
Hashtable map = new Hashtable(brokerObjectName.getKeyPropertyList());
Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
map.put("Type",JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()));
map.put("Destination", JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
ObjectName destObjectName= new ObjectName(brokerObjectName.getDomain(), map);
Object view;
if( destination instanceof Queue ) {
view = new QueueView((Queue) destination);
} else {
view = new TopicView((Topic) destination);
map.put("Destination",JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
try{
ObjectName destObjectName=new ObjectName(brokerObjectName.getDomain(),map);
DestinationView view;
if(destination instanceof Queue){
view=new QueueView((Queue) destination);
}else{
view=new TopicView((Topic) destination);
}
registerDestination(destObjectName,destName,view);
}catch(Exception e){
log.error("Failed to register destination "+destName,e);
}
mbeanServer.registerMBean(view, destObjectName);
}
public void unregister(ActiveMQDestination destName) throws Throwable {
public void unregister(ActiveMQDestination destName){
// Build the object name for the destination
Hashtable map = new Hashtable(brokerObjectName.getKeyPropertyList());
Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
map.put("Type",JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()));
map.put("Destination", JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
ObjectName destObjectName= new ObjectName(brokerObjectName.getDomain(), map);
mbeanServer.unregisterMBean(destObjectName);
map.put("Destination",JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
try{
ObjectName destObjectName=new ObjectName(brokerObjectName.getDomain(),map);
unregisterDestination(destObjectName);
}catch(Exception e){
log.error("Failed to unregister "+destName,e);
}
}
public void registerSubscription(Subscription sub) {
// TODO: Use this to expose subscriptions to the JMX bus for management
public void registerSubscription(Subscription sub){
Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString()));
try{
ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
SubscriptionView view;
if(sub.getConsumerInfo().isDurable()){
view=new DurableSubscriptionView(sub);
}else{
view=new SubscriptionView(sub);
}
registerSubscription(objectName,sub.getConsumerInfo(),view);
}catch(Exception e){
log.error("Failed to register subscription "+sub,e);
}
}
public void unregisterSubscription(Subscription sub) {
// TODO: Use this to expose subscriptions to the JMX bus for management
public void unregisterSubscription(Subscription sub){
Hashtable map=new Hashtable(brokerObjectName.getKeyPropertyList());
map.put("Type",JMXSupport.encodeObjectNamePart("Subscription"));
map.put("name",JMXSupport.encodeObjectNamePart(sub.getConsumerInfo().toString()));
try{
ObjectName objectName=new ObjectName(brokerObjectName.getDomain(),map);
unregisterSubscription(objectName);
}catch(Exception e){
log.error("Failed to unregister subscription "+sub,e);
}
}
protected void registerDestination(ObjectName key,ActiveMQDestination dest,DestinationView view) throws Exception{
if(dest.isQueue()){
if(dest.isTemporary()){
temporaryQueues.put(key,view);
}else{
queues.put(key,view);
}
}else{
if(dest.isTemporary()){
temporaryTopics.put(key,view);
}else{
topics.put(key,view);
}
}
mbeanServer.registerMBean(view,key);
}
protected void unregisterDestination(ObjectName key) throws Exception{
topics.remove(key);
queues.remove(key);
temporaryQueues.remove(key);
temporaryTopics.remove(key);
mbeanServer.unregisterMBean(key);
}
protected void registerSubscription(ObjectName key,ConsumerInfo info,SubscriptionView view) throws Exception{
ActiveMQDestination dest=info.getDestination();
if(dest.isQueue()){
if(dest.isTemporary()){
temporaryQueueSubscribers.put(key,view);
}else{
queueSubscribers.put(key,view);
}
}else{
if(dest.isTemporary()){
temporaryTopicSubscribers.put(key,view);
}else{
if(info.isDurable()){
durableTopicSubscribers.put(key,view);
}else{
topicSubscribers.put(key,view);
}
}
}
mbeanServer.registerMBean(view,key);
}
protected void unregisterSubscription(ObjectName key) throws Exception{
queueSubscribers.remove(key);
topicSubscribers.remove(key);
durableTopicSubscribers.remove(key);
temporaryQueueSubscribers.remove(key);
temporaryTopicSubscribers.remove(key);
mbeanServer.unregisterMBean(key);
}
protected ObjectName[] getTopics(){
Set set = topics.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getQueues(){
Set set = queues.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryTopics(){
Set set = temporaryTopics.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryQueues(){
Set set = temporaryQueues.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTopicSubscribers(){
Set set = topicSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getDurableTopicSubscribers(){
Set set = durableTopicSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getQueueSubscribers(){
Set set = queueSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryTopicSubscribers(){
Set set = temporaryTopicSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryQueueSubscribers(){
Set set = temporaryQueueSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
}

View File

@ -0,0 +1,154 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
/**
* @version $Revision: 1.5 $
*/
public class SubscriptionView implements SubscriptionViewMBean {
protected final Subscription subscription;
/**
* Constructior
* @param subs
*/
public SubscriptionView(Subscription subs){
this.subscription = subs;
}
/**
* @return the id of the Connection the Subscription is on
*/
public String getConnectionId(){
ConsumerInfo info = subscription.getConsumerInfo();
if (info != null){
return info.getConsumerId().getConnectionId();
}
return "NOTSET";
}
/**
* @return the id of the Session the subscription is on
*/
public long getSessionId(){
ConsumerInfo info = subscription.getConsumerInfo();
if (info != null){
return info.getConsumerId().getSessionId();
}
return 0;
}
/**
* @return the id of the Subscription
*/
public long getSubcriptionId(){
ConsumerInfo info = subscription.getConsumerInfo();
if (info != null){
return info.getConsumerId().getValue();
}
return 0;
}
/**
* @return the destination name
*/
public String getDestinationName(){
ConsumerInfo info = subscription.getConsumerInfo();
if (info != null){
ActiveMQDestination dest = info.getDestination();
return dest.getPhysicalName();
}
return "NOTSET";
}
/**
* @return true if the destination is a Queue
*/
public boolean isDestinationQueue(){
ConsumerInfo info = subscription.getConsumerInfo();
if (info != null){
ActiveMQDestination dest = info.getDestination();
return dest.isQueue();
}
return false;
}
/**
* @return true of the destination is a Topic
*/
public boolean isDestinationTopic(){
ConsumerInfo info = subscription.getConsumerInfo();
if (info != null){
ActiveMQDestination dest = info.getDestination();
return dest.isTopic();
}
return false;
}
/**
* @return true if the destination is temporary
*/
public boolean isDestinationTemporary(){
ConsumerInfo info = subscription.getConsumerInfo();
if (info != null){
ActiveMQDestination dest = info.getDestination();
return dest.isTemporary();
}
return false;
}
/**
* The subscription should release as may references as it can to help the garbage collector
* reclaim memory.
*/
public void gc(){
subscription.gc();
}
/**
* @return number of messages pending delivery
*/
public int getPending(){
return subscription.pending();
}
/**
* @return number of messages dispatched
*/
public int getDispatched(){
return subscription.dispatched();
}
/**
* @return number of messages delivered
*/
public int getDelivered(){
return subscription.delivered();
}
}

View File

@ -0,0 +1,74 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.jmx;
/**
* @version $Revision: 1.5 $
*/
public interface SubscriptionViewMBean{
/**
* @return the id of the Connection the Subscription is on
*/
public String getConnectionId();
/**
* @return the id of the Session the subscription is on
*/
public long getSessionId();
/**
* @return the id of the Subscription
*/
public long getSubcriptionId();
/**
* @return the destination name
*/
public String getDestinationName();
/**
* @return true if the destination is a Queue
*/
public boolean isDestinationQueue();
/**
* @return true of the destination is a Topic
*/
public boolean isDestinationTopic();
/**
* @return true if the destination is temporary
*/
public boolean isDestinationTemporary();
/**
* The subscription should release as may references as it can to help the garbage collector reclaim memory.
*/
public void gc();
/**
* @return number of messages pending delivery
*/
public int getPending();
/**
* @return number of messages dispatched
*/
public int getDispatched();
/**
* @return number of messages delivered
*/
public int getDelivered();
}

View File

@ -122,6 +122,16 @@ public class DurableTopicSubscription extends PrefetchSubscription {
super.add(node);
node.decrementReferenceCount();
}
public int pending(){
if (active){
return super.pending();
}
//TODO: need to get from store
return 0;
}
protected boolean canDispatch(MessageReference node) {
return active;

View File

@ -225,6 +225,18 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
protected boolean isFull(){
return dispatched.size()-delivered>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
}
public int pending(){
return matched.size() - dispatched.size();
}
public int dispatched(){
return dispatched.size();
}
public int delivered(){
return delivered;
}
protected void dispatchMatched() throws IOException{
if(!dispatching){

View File

@ -100,4 +100,19 @@ public interface Subscription {
*/
boolean isSlaveBroker();
/**
* @return number of messages pending delivery
*/
int pending();
/**
* @return number of messages dispatched
*/
int dispatched();
/**
* @return number of messages delivered
*/
int delivered();
}

View File

@ -110,6 +110,18 @@ public class TopicSubscription extends AbstractSubscription {
throw new JMSException("Invalid acknowledgment: "+ack);
}
public int pending(){
return matched.size() - dispatched;
}
public int dispatched(){
return dispatched;
}
public int delivered(){
return delivered;
}
private boolean isFull() {
return dispatched-delivered >= info.getPrefetchSize();
}

View File

@ -104,15 +104,15 @@ class TopicBridge extends DestinationBridge{
}
/**
* @return Returns the consumerName.
* @return Returns the subscriptionName.
*/
public String getConsumerName(){
return consumerName;
}
/**
* @param consumerName
* The consumerName to set.
* @param subscriptionName
* The subscriptionName to set.
*/
public void setConsumerName(String consumerName){
this.consumerName=consumerName;