diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index bee981631a..4a1ba20f39 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -266,6 +266,7 @@ public class ManagedRegionBroker extends RegionBroker { ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); if (inactiveName != null) { inactiveDurableTopicSubscribers.remove(inactiveName); + managementContext.unregisterMBean(inactiveName); } } catch (Exception e) { LOG.error("Failed to unregister subscription " + sub, e); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java index 87f0d78e51..dcb7334eea 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionUnsubscribeTest.java @@ -21,16 +21,20 @@ import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DurableTopicSubscription; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import javax.jms.Connection; import javax.jms.Session; -import javax.management.MBeanServer; -import javax.management.ObjectName; +import javax.management.*; import java.io.File; import java.lang.management.ManagementFactory; +import java.util.List; + public class DurableSubscriptionUnsubscribeTest extends TestSupport { @@ -63,142 +67,100 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { } public void doJMXUnsubscribe(boolean restart) throws Exception { - for (int i = 0; i < 100; i++) { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId" + i); - session.close(); - } + createSubscriptions(); - Thread.sleep(2 * 1000); + Thread.sleep(1000); + assertCount(100, 0); if (restart) { - stopBroker(); - startBroker(false); + restartBroker(); + assertCount(100, 0); } MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers(); - ObjectName[] inactive = broker.getAdminView().getInactiveDurableTopicSubscribers(); + ObjectName[] subs = broker.getAdminView().getInactiveDurableTopicSubscribers(); - for (ObjectName subscription: subscriptions) { - mbs.invoke(subscription, "destroy", null, null); - } - for (ObjectName subscription: inactive) { - mbs.invoke(subscription, "destroy", null, null); + for (int i = 0; i < subs.length; i++) { + ObjectName sub = subs[i]; + mbs.invoke(sub, "destroy", null, null); + + if (i % 20 == 0) { + Thread.sleep(1000); + assertCount(100 - i - 1, 0); + } } - Thread.sleep(2 * 1000); - - subscriptions = broker.getAdminView().getDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); - - subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); - } - - public void testInactiveSubscriptions() throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId"); - - ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers(); - assertEquals(1, subscriptions.length); - - subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); - - session.close(); - - Thread.sleep(1000); - - subscriptions = broker.getAdminView().getDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); - - subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers(); - assertEquals(1, subscriptions.length); - - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId"); - - Thread.sleep(1000); - - subscriptions = broker.getAdminView().getDurableTopicSubscribers(); - assertEquals(1, subscriptions.length); - - subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); - - session.close(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Thread.sleep(1000); - - subscriptions = broker.getAdminView().getDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); - - subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers(); - assertEquals(1, subscriptions.length); - - session.unsubscribe("SubsId"); - - Thread.sleep(1000); - - subscriptions = broker.getAdminView().getDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); - - subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); - - session.close(); + Thread.sleep(1000); + assertCount(0, 0); + if (restart) { + restartBroker(); + assertCount(0, 0); + } } public void doConnectionUnsubscribe(boolean restart) throws Exception { - for (int i = 0; i < 100; i++) { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId" + i); - session.close(); - } + createSubscriptions(); - Thread.sleep(2 * 1000); + Thread.sleep(1000); + assertCount(100, 0); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId1"); + + Thread.sleep(1000); + assertCount(100, 1); + + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session2.createDurableSubscriber(topic, "SubsId2"); + + Thread.sleep(1000); + assertCount(100, 2); + + session.close(); + + Thread.sleep(1000); + assertCount(100, 1); + + session2.close(); + + Thread.sleep(1000); + assertCount(100, 0); if (restart) { - stopBroker(); - startBroker(false); + restartBroker(); + assertCount(100, 0); } - ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); - - subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers(); - assertEquals(100, subscriptions.length); - for (int i = 0; i < 100; i++) { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session.unsubscribe("SubsId" + i); session.close(); + + if (i % 20 == 0) { + Thread.sleep(1000); + assertCount(100 - i - 1, 0); + } } - Thread.sleep(2 * 1000); + Thread.sleep(1000); + assertCount(0, 0); - subscriptions = broker.getAdminView().getDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); - - subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); + if (restart) { + restartBroker(); + assertCount(0, 0); + } } public void doDirectUnsubscribe(boolean restart) throws Exception { - for (int i = 0; i < 100; i++) { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId" + i); - session.close(); - } + createSubscriptions(); - Thread.sleep(2 * 1000); + Thread.sleep(1000); + assertCount(100, 0); if (restart) { - stopBroker(); - startBroker(false); + restartBroker(); + assertCount(100, 0); } for (int i = 0; i < 100; i++) { @@ -209,19 +171,78 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { context.setBroker(broker.getRegionBroker()); context.setClientId(getName()); broker.getRegionBroker().removeSubscription(context, info); + + if (i % 20 == 0) { + assertCount(100 - i - 1, 0); + } } - Thread.sleep(2 * 1000); + assertCount(0, 0); + if (restart) { + restartBroker(); + assertCount(0, 0); + } + } + + private void createSubscriptions() throws Exception { + for (int i = 0; i < 100; i++) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId" + i); + session.close(); + } + } + + + private void assertCount(int all, int active) throws Exception { + int inactive = all - active; + + // broker check + Destination destination = broker.getDestination(topic); + List subs = destination.getConsumers(); + int cActive = 0, cInactive = 0; + for (Subscription sub: subs) { + if (sub instanceof DurableTopicSubscription) { + DurableTopicSubscription durable = (DurableTopicSubscription) sub; + if (durable.isActive()) + cActive++; + else + cInactive++; + } + } + assertEquals(active, cActive); + assertEquals(inactive, cInactive); + + // admin view ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); - + assertEquals(active, subscriptions.length); subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers(); - assertEquals(0, subscriptions.length); + assertEquals(inactive, subscriptions.length); + + // check the strange false MBean + if (all == 0) + assertEquals(0, countMBean()); + } + + private int countMBean() throws MalformedObjectNameException, InstanceNotFoundException { + int count = 0; + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + for (int i = 0; i < 100; i++) { + String name = "org.apache.activemq:BrokerName=" + getName() + ",Type=Subscription,active=false,name=" + getName() + "_SubsId" + i; + ObjectName sub = new ObjectName(name); + try { + ObjectInstance oi = mbs.getObjectInstance(sub); + count++; + } + catch (InstanceNotFoundException ignore) { + // this should happen + } + } + return count; } private void startBroker(boolean deleteMessages) throws Exception { - broker = BrokerFactory.createBroker("broker:(vm://localhost)"); + broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")"); broker.setUseJmx(true); broker.setBrokerName(getName()); @@ -233,7 +254,11 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { broker.setDeleteAllMessagesOnStartup(true); } + + broker.setKeepDurableSubsActive(true); + broker.start(); + broker.waitUntilStarted(); connection = createConnection(); } @@ -243,11 +268,18 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { connection.close(); connection = null; - if (broker != null) + if (broker != null) { broker.stop(); + broker.waitUntilStopped(); + } broker = null; } + private void restartBroker() throws Exception { + stopBroker(); + startBroker(false); + } + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { return new ActiveMQConnectionFactory("vm://" + getName() + "?waitForStart=5000&create=false"); } @@ -273,4 +305,4 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport { rc.start(); return rc; } -} +} \ No newline at end of file