mirror of https://github.com/apache/activemq.git
Add some additional checks for valid state.
This commit is contained in:
parent
297eadf746
commit
3c342ffce4
|
@ -27,14 +27,18 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.JMSException;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import javax.jms.TopicSubscriber;
|
import javax.jms.TopicSubscriber;
|
||||||
|
import javax.management.MalformedObjectNameException;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.broker.BrokerPlugin;
|
import org.apache.activemq.broker.BrokerPlugin;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.jmx.TopicViewMBean;
|
||||||
import org.apache.activemq.broker.region.Destination;
|
import org.apache.activemq.broker.region.Destination;
|
||||||
import org.apache.activemq.broker.region.RegionBroker;
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
import org.apache.activemq.broker.region.TopicRegion;
|
import org.apache.activemq.broker.region.TopicRegion;
|
||||||
|
@ -111,8 +115,26 @@ public class AMQ6254Test {
|
||||||
assertNotNull("Message not received.", message);
|
assertNotNull("Message not received.", message);
|
||||||
assertEquals("Hello A", message.getText());
|
assertEquals("Hello A", message.getText());
|
||||||
|
|
||||||
|
assertTrue("Should have only one consumer", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return getProxyToTopic(topicA).getConsumerCount() == 1;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
subscriber.close();
|
subscriber.close();
|
||||||
|
|
||||||
|
assertTrue("Should have one message consumed", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return getProxyToTopic(topicA).getDequeueCount() == 1;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
|
||||||
assertTrue("Should have only one destination", Wait.waitFor(new Wait.Condition() {
|
assertTrue("Should have only one destination", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -122,6 +144,14 @@ public class AMQ6254Test {
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
assertTrue("Should have only one inactive subscription", Wait.waitFor(new Wait.Condition() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
// Restart broker
|
// Restart broker
|
||||||
brokerService.stop();
|
brokerService.stop();
|
||||||
brokerService.waitUntilStopped();
|
brokerService.waitUntilStopped();
|
||||||
|
@ -201,7 +231,7 @@ public class AMQ6254Test {
|
||||||
BrokerService answer = new BrokerService();
|
BrokerService answer = new BrokerService();
|
||||||
|
|
||||||
answer.setKeepDurableSubsActive(true);
|
answer.setKeepDurableSubsActive(true);
|
||||||
answer.setUseJmx(false);
|
answer.setUseJmx(true);
|
||||||
answer.setPersistent(true);
|
answer.setPersistent(true);
|
||||||
answer.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
answer.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||||
answer.setAdvisorySupport(false);
|
answer.setAdvisorySupport(false);
|
||||||
|
@ -308,4 +338,11 @@ public class AMQ6254Test {
|
||||||
|
|
||||||
return authorizationPlugin;
|
return authorizationPlugin;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
|
||||||
|
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
|
||||||
|
TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext()
|
||||||
|
.newProxyInstance(queueViewMBeanName, TopicViewMBean.class, true);
|
||||||
|
return proxy;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue