mirror of https://github.com/apache/activemq.git
AMQ-928 - decrement consumer count only when a subscription was removed (add null check)
- add test git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@450371 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
45055d3532
commit
91bcbda6e0
|
@ -93,11 +93,10 @@ public class Topic implements Destination {
|
|||
public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
|
||||
|
||||
sub.add(context, this);
|
||||
destinationStatistics.getConsumers().increment();
|
||||
|
||||
if ( !sub.getConsumerInfo().isDurable() ) {
|
||||
|
||||
destinationStatistics.getConsumers().increment();
|
||||
|
||||
|
||||
// Do a retroactive recovery if needed.
|
||||
if (sub.getConsumerInfo().isRetroactive()) {
|
||||
|
||||
|
@ -139,8 +138,10 @@ public class Topic implements Destination {
|
|||
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
|
||||
if (store != null) {
|
||||
store.deleteSubscription(key.clientId, key.subscriptionName);
|
||||
durableSubcribers.remove(key);
|
||||
destinationStatistics.getConsumers().decrement();
|
||||
Object removed = durableSubcribers.remove(key);
|
||||
if(removed != null) {
|
||||
destinationStatistics.getConsumers().decrement();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.broker.jmx;
|
|||
|
||||
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.Topic;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
|
@ -73,6 +74,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
|
|||
// messages on a queue
|
||||
assertQueueBrowseWorks();
|
||||
assertCreateAndDestroyDurableSubscriptions();
|
||||
assertConsumerCounts();
|
||||
}
|
||||
|
||||
public void testMoveMessagesBySelector() throws Exception {
|
||||
|
@ -205,6 +207,57 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
|
|||
assertEquals("Durable subscriber count", 1, broker.getDurableTopicSubscribers().length);
|
||||
}
|
||||
|
||||
protected void assertConsumerCounts() throws Exception {
|
||||
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
|
||||
BrokerViewMBean broker = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
|
||||
|
||||
//create 2 topics
|
||||
broker.addTopic(getDestinationString() + "1");
|
||||
broker.addTopic(getDestinationString() + "2");
|
||||
|
||||
ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination="+getDestinationString() + "1");
|
||||
ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination="+getDestinationString() + "2");
|
||||
TopicViewMBean topic1 = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true);
|
||||
TopicViewMBean topic2 = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true);
|
||||
|
||||
assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
|
||||
assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
|
||||
|
||||
String topicName = getDestinationString();
|
||||
String selector = null;
|
||||
|
||||
//create 1 subscriber for each topic
|
||||
broker.createDurableSubscriber(clientID, "topic1.subscriber1", topicName + "1", selector);
|
||||
broker.createDurableSubscriber(clientID, "topic2.subscriber1", topicName + "2", selector);
|
||||
|
||||
assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
|
||||
assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
|
||||
|
||||
//create 1 more subscriber for topic1
|
||||
broker.createDurableSubscriber(clientID, "topic1.subscriber2", topicName + "1", selector);
|
||||
|
||||
assertEquals("topic1 Durable subscriber count", 2, topic1.getConsumerCount());
|
||||
assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
|
||||
|
||||
//destroy topic1 subscriber
|
||||
broker.destroyDurableSubscriber(clientID, "topic1.subscriber1");
|
||||
|
||||
assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
|
||||
assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
|
||||
|
||||
// destroy topic2 subscriber
|
||||
broker.destroyDurableSubscriber(clientID, "topic2.subscriber1");
|
||||
|
||||
assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
|
||||
assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
|
||||
|
||||
//destroy remaining topic1 subscriber
|
||||
broker.destroyDurableSubscriber(clientID, "topic1.subscriber2");
|
||||
|
||||
assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
|
||||
assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
|
||||
}
|
||||
|
||||
protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {
|
||||
ObjectName objectName = new ObjectName(name);
|
||||
if (mbeanServer.isRegistered(objectName)) {
|
||||
|
|
Loading…
Reference in New Issue