mirror of https://github.com/apache/activemq.git
AMQ-6030 Adds some advisory topic specific tests to cover the composite
case for destination advisories.
This commit is contained in:
parent
ddd2812279
commit
7525729363
|
@ -32,6 +32,7 @@ import javax.jms.MessageProducer;
|
|||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.Topic;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.broker.BrokerPlugin;
|
||||
|
@ -344,4 +345,83 @@ public class StompAdvisoryTest extends StompTestSupport {
|
|||
c.stop();
|
||||
c.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testDestinationAdvisoryTempQueue() throws Exception {
|
||||
|
||||
cf.setWatchTopicAdvisories(false);
|
||||
|
||||
stompConnect();
|
||||
stompConnection.connect("system", "manager");
|
||||
stompConnection.subscribe("/topic/ActiveMQ.Advisory.TempQueue", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||
|
||||
// Now connect via openwire and check we get the advisory
|
||||
Connection connection = cf.createConnection("system", "manager");
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createTemporaryQueue();
|
||||
connection.close();
|
||||
|
||||
StompFrame f = stompConnection.receive();
|
||||
LOG.debug(f.toString());
|
||||
assertEquals(f.getAction(),"MESSAGE");
|
||||
assertTrue("Should have a body", f.getBody().length() > 0);
|
||||
assertTrue(f.getBody().startsWith("{\"DestinationInfo\":"));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testDestinationAdvisoryTempTopic() throws Exception {
|
||||
|
||||
cf.setWatchTopicAdvisories(false);
|
||||
|
||||
stompConnect();
|
||||
stompConnection.connect("system", "manager");
|
||||
stompConnection.subscribe("/topic/ActiveMQ.Advisory.TempTopic", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||
|
||||
// Now connect via openwire and check we get the advisory
|
||||
Connection connection = cf.createConnection("system", "manager");
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createTemporaryTopic();
|
||||
connection.close();
|
||||
|
||||
StompFrame f = stompConnection.receive();
|
||||
LOG.debug(f.toString());
|
||||
assertEquals(f.getAction(),"MESSAGE");
|
||||
assertTrue("Should have a body", f.getBody().length() > 0);
|
||||
assertTrue(f.getBody().startsWith("{\"DestinationInfo\":"));
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testDestinationAdvisoryCompositeTempDestinations() throws Exception {
|
||||
|
||||
cf.setWatchTopicAdvisories(true);
|
||||
|
||||
stompConnect();
|
||||
stompConnection.connect("system", "manager");
|
||||
stompConnection.subscribe("/topic/ActiveMQ.Advisory.TempTopic,/topic/ActiveMQ.Advisory.TempQueue", Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||
|
||||
// Now connect via openwire and check we get the advisory
|
||||
Connection connection = cf.createConnection("system", "manager");
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createTemporaryTopic();
|
||||
session.createTemporaryQueue();
|
||||
|
||||
ObjectName[] topicSubscribers = brokerService.getAdminView().getTopicSubscribers();
|
||||
for (ObjectName subscription : topicSubscribers) {
|
||||
LOG.info("Topic Subscription: {}", subscription);
|
||||
}
|
||||
|
||||
connection.close();
|
||||
|
||||
StompFrame f = stompConnection.receive();
|
||||
LOG.debug(f.toString());
|
||||
assertEquals(f.getAction(),"MESSAGE");
|
||||
assertTrue("Should have a body", f.getBody().length() > 0);
|
||||
assertTrue(f.getBody().startsWith("{\"DestinationInfo\":"));
|
||||
|
||||
f = stompConnection.receive();
|
||||
LOG.debug(f.toString());
|
||||
assertEquals(f.getAction(),"MESSAGE");
|
||||
assertTrue("Should have a body", f.getBody().length() > 0);
|
||||
assertTrue(f.getBody().startsWith("{\"DestinationInfo\":"));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue