mirror of https://github.com/apache/activemq.git
Add some additional checks for valid state.
(cherry picked from commit 3c342ffce4
)
This commit is contained in:
parent
d38c5906f6
commit
37b5dbf0ea
|
@ -27,14 +27,18 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.TopicSubscriber;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerPlugin;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.jmx.TopicViewMBean;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.broker.region.TopicRegion;
|
||||
|
@ -111,8 +115,26 @@ public class AMQ6254Test {
|
|||
assertNotNull("Message not received.", message);
|
||||
assertEquals("Hello A", message.getText());
|
||||
|
||||
assertTrue("Should have only one consumer", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return getProxyToTopic(topicA).getConsumerCount() == 1;
|
||||
}
|
||||
}));
|
||||
|
||||
subscriber.close();
|
||||
|
||||
assertTrue("Should have one message consumed", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return getProxyToTopic(topicA).getDequeueCount() == 1;
|
||||
}
|
||||
}));
|
||||
|
||||
connection.close();
|
||||
|
||||
assertTrue("Should have only one destination", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
|
@ -122,6 +144,14 @@ public class AMQ6254Test {
|
|||
}
|
||||
}));
|
||||
|
||||
assertTrue("Should have only one inactive subscription", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
|
||||
}
|
||||
}));
|
||||
|
||||
// Restart broker
|
||||
brokerService.stop();
|
||||
brokerService.waitUntilStopped();
|
||||
|
@ -201,7 +231,7 @@ public class AMQ6254Test {
|
|||
BrokerService answer = new BrokerService();
|
||||
|
||||
answer.setKeepDurableSubsActive(true);
|
||||
answer.setUseJmx(false);
|
||||
answer.setUseJmx(true);
|
||||
answer.setPersistent(true);
|
||||
answer.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||
answer.setAdvisorySupport(false);
|
||||
|
@ -308,4 +338,11 @@ public class AMQ6254Test {
|
|||
|
||||
return authorizationPlugin;
|
||||
}
|
||||
|
||||
protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
|
||||
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
|
||||
TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext()
|
||||
.newProxyInstance(queueViewMBeanName, TopicViewMBean.class, true);
|
||||
return proxy;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue