diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6254Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6254Test.java index 9c9a35b8a9..f88fca9ba8 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6254Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6254Test.java @@ -27,14 +27,18 @@ import java.util.List; import java.util.Set; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicSubscriber; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.TopicRegion; @@ -111,8 +115,26 @@ public class AMQ6254Test { assertNotNull("Message not received.", message); assertEquals("Hello A", message.getText()); + assertTrue("Should have only one consumer", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToTopic(topicA).getConsumerCount() == 1; + } + })); + subscriber.close(); + assertTrue("Should have one message consumed", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToTopic(topicA).getDequeueCount() == 1; + } + })); + + connection.close(); + assertTrue("Should have only one destination", Wait.waitFor(new Wait.Condition() { @Override @@ -122,6 +144,14 @@ public class AMQ6254Test { } })); + assertTrue("Should have only one inactive subscription", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1; + } + })); + // Restart broker brokerService.stop(); brokerService.waitUntilStopped(); @@ -201,7 +231,7 @@ public class AMQ6254Test { BrokerService answer = new BrokerService(); answer.setKeepDurableSubsActive(true); - answer.setUseJmx(false); + answer.setUseJmx(true); answer.setPersistent(true); answer.setDeleteAllMessagesOnStartup(deleteAllMessages); answer.setAdvisorySupport(false); @@ -308,4 +338,11 @@ public class AMQ6254Test { return authorizationPlugin; } + + protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name); + TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext() + .newProxyInstance(queueViewMBeanName, TopicViewMBean.class, true); + return proxy; + } }