https://issues.apache.org/jira/browse/AMQ-3081 - Durable subscriptions are not removed from mbean

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1044465 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-12-10 18:14:05 +00:00
parent bf20aa514c
commit 054fc6aca5
2 changed files with 148 additions and 115 deletions

View File

@ -266,6 +266,7 @@ public class ManagedRegionBroker extends RegionBroker {
ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
if (inactiveName != null) {
inactiveDurableTopicSubscribers.remove(inactiveName);
managementContext.unregisterMBean(inactiveName);
}
} catch (Exception e) {
LOG.error("Failed to unregister subscription " + sub, e);

View File

@ -21,16 +21,20 @@ import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import javax.jms.Connection;
import javax.jms.Session;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.*;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.util.List;
public class DurableSubscriptionUnsubscribeTest extends TestSupport {
@ -63,142 +67,100 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport {
}
public void doJMXUnsubscribe(boolean restart) throws Exception {
for (int i = 0; i < 100; i++) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId" + i);
session.close();
}
createSubscriptions();
Thread.sleep(2 * 1000);
Thread.sleep(1000);
assertCount(100, 0);
if (restart) {
stopBroker();
startBroker(false);
restartBroker();
assertCount(100, 0);
}
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
ObjectName[] inactive = broker.getAdminView().getInactiveDurableTopicSubscribers();
ObjectName[] subs = broker.getAdminView().getInactiveDurableTopicSubscribers();
for (ObjectName subscription: subscriptions) {
mbs.invoke(subscription, "destroy", null, null);
}
for (ObjectName subscription: inactive) {
mbs.invoke(subscription, "destroy", null, null);
for (int i = 0; i < subs.length; i++) {
ObjectName sub = subs[i];
mbs.invoke(sub, "destroy", null, null);
if (i % 20 == 0) {
Thread.sleep(1000);
assertCount(100 - i - 1, 0);
}
}
Thread.sleep(2 * 1000);
subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
}
public void testInactiveSubscriptions() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(1, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
session.close();
Thread.sleep(1000);
subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(1, subscriptions.length);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId");
Thread.sleep(1000);
subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(1, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
session.close();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Thread.sleep(1000);
subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(1, subscriptions.length);
session.unsubscribe("SubsId");
Thread.sleep(1000);
subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
session.close();
Thread.sleep(1000);
assertCount(0, 0);
if (restart) {
restartBroker();
assertCount(0, 0);
}
}
public void doConnectionUnsubscribe(boolean restart) throws Exception {
for (int i = 0; i < 100; i++) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId" + i);
session.close();
}
createSubscriptions();
Thread.sleep(2 * 1000);
Thread.sleep(1000);
assertCount(100, 0);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId1");
Thread.sleep(1000);
assertCount(100, 1);
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session2.createDurableSubscriber(topic, "SubsId2");
Thread.sleep(1000);
assertCount(100, 2);
session.close();
Thread.sleep(1000);
assertCount(100, 1);
session2.close();
Thread.sleep(1000);
assertCount(100, 0);
if (restart) {
stopBroker();
startBroker(false);
restartBroker();
assertCount(100, 0);
}
ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(100, subscriptions.length);
for (int i = 0; i < 100; i++) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.unsubscribe("SubsId" + i);
session.close();
if (i % 20 == 0) {
Thread.sleep(1000);
assertCount(100 - i - 1, 0);
}
}
Thread.sleep(2 * 1000);
Thread.sleep(1000);
assertCount(0, 0);
subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
if (restart) {
restartBroker();
assertCount(0, 0);
}
}
public void doDirectUnsubscribe(boolean restart) throws Exception {
for (int i = 0; i < 100; i++) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId" + i);
session.close();
}
createSubscriptions();
Thread.sleep(2 * 1000);
Thread.sleep(1000);
assertCount(100, 0);
if (restart) {
stopBroker();
startBroker(false);
restartBroker();
assertCount(100, 0);
}
for (int i = 0; i < 100; i++) {
@ -209,19 +171,78 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport {
context.setBroker(broker.getRegionBroker());
context.setClientId(getName());
broker.getRegionBroker().removeSubscription(context, info);
if (i % 20 == 0) {
assertCount(100 - i - 1, 0);
}
}
Thread.sleep(2 * 1000);
assertCount(0, 0);
if (restart) {
restartBroker();
assertCount(0, 0);
}
}
private void createSubscriptions() throws Exception {
for (int i = 0; i < 100; i++) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId" + i);
session.close();
}
}
private void assertCount(int all, int active) throws Exception {
int inactive = all - active;
// broker check
Destination destination = broker.getDestination(topic);
List<Subscription> subs = destination.getConsumers();
int cActive = 0, cInactive = 0;
for (Subscription sub: subs) {
if (sub instanceof DurableTopicSubscription) {
DurableTopicSubscription durable = (DurableTopicSubscription) sub;
if (durable.isActive())
cActive++;
else
cInactive++;
}
}
assertEquals(active, cActive);
assertEquals(inactive, cInactive);
// admin view
ObjectName[] subscriptions = broker.getAdminView().getDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
assertEquals(active, subscriptions.length);
subscriptions = broker.getAdminView().getInactiveDurableTopicSubscribers();
assertEquals(0, subscriptions.length);
assertEquals(inactive, subscriptions.length);
// check the strange false MBean
if (all == 0)
assertEquals(0, countMBean());
}
private int countMBean() throws MalformedObjectNameException, InstanceNotFoundException {
int count = 0;
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
for (int i = 0; i < 100; i++) {
String name = "org.apache.activemq:BrokerName=" + getName() + ",Type=Subscription,active=false,name=" + getName() + "_SubsId" + i;
ObjectName sub = new ObjectName(name);
try {
ObjectInstance oi = mbs.getObjectInstance(sub);
count++;
}
catch (InstanceNotFoundException ignore) {
// this should happen
}
}
return count;
}
private void startBroker(boolean deleteMessages) throws Exception {
broker = BrokerFactory.createBroker("broker:(vm://localhost)");
broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
broker.setUseJmx(true);
broker.setBrokerName(getName());
@ -233,7 +254,11 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport {
broker.setDeleteAllMessagesOnStartup(true);
}
broker.setKeepDurableSubsActive(true);
broker.start();
broker.waitUntilStarted();
connection = createConnection();
}
@ -243,11 +268,18 @@ public class DurableSubscriptionUnsubscribeTest extends TestSupport {
connection.close();
connection = null;
if (broker != null)
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
broker = null;
}
private void restartBroker() throws Exception {
stopBroker();
startBroker(false);
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://" + getName() + "?waitForStart=5000&create=false");
}