Updated test for https://issues.apache.org/jira/browse/AMQ-5290 after tbish commit for enhancement.

This commit is contained in:
Christian Posta 2014-08-06 17:02:17 -07:00
parent c42b874972
commit 0d9eedc658
1 changed files with 19 additions and 13 deletions

View File

@ -24,7 +24,6 @@ import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.network.NetworkTestSupport;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.commons.lang.ArrayUtils;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.mqtt.client.*;
@ -48,10 +47,6 @@ import java.util.concurrent.TimeUnit;
public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTNetworkOfBrokersFailoverTest.class);
private final String subName = "Subscriber1";
private final String subName2 = "Subscriber2";
private final String topicName = "TEST.FOO";
private int localBrokerMQTTPort = -1;
private int remoteBrokerMQTTPort = -1;
@ -99,11 +94,12 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
BlockingConnection remoteConn = remoteMqtt.blockingConnection();
remoteConn.connect();
remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
assertTrue("No destination detected!", consumerNetworked.await(1, TimeUnit.SECONDS));
assertQueueExistsOn(remoteBroker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
assertQueueExistsOn(broker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
remoteConn.disconnect();
consumerNetworked.await(1, TimeUnit.SECONDS);
assertOneDurableSubOn(remoteBroker, "foo");
assertOneDurableSubOn(broker, "NC_localhost_inbound_local");
// now we reconnect the same sub on the local broker, again with clean==0
MQTT localMqtt = createMQTTTcpConnection("foo", false, localBrokerMQTTPort);
@ -139,17 +135,16 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
final CountDownLatch latch = new CountDownLatch(1);
URI brokerUri = broker.getVmConnectorURI();
System.out.println(brokerUri.toASCIIString());
final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri.toASCIIString());
final Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Topic.foo.bar.>");
Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar");
MessageConsumer consumer = session.createConsumer(dest);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("got message! " + message);
latch.countDown();
// shutdown this connection
Dispatch.getGlobalQueue().execute(new Runnable() {
@ -170,6 +165,13 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
return latch;
}
private void assertQueueExistsOn(BrokerService broker, String queueName) throws Exception {
BrokerViewMBean brokerView = broker.getAdminView();
ObjectName[] queueNames = brokerView.getQueues();
assertEquals(1, queueNames.length);
assertTrue(queueNames[0].toString().contains(queueName));
}
private void assertOneDurableSubOn(BrokerService broker, String subName) throws Exception {
BrokerViewMBean brokerView = broker.getAdminView();
@ -193,7 +195,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
broker.setBrokerName("local");
broker.setDataDirectory("target/activemq-data");
broker.setDeleteAllMessagesOnStartup(true);
TransportConnector tc = broker.addConnector("mqtt://localhost:0");
TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri());
localBrokerMQTTPort = tc.getConnectUri().getPort();
return broker;
}
@ -204,11 +206,15 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
broker.setPersistent(true);
broker.setDeleteAllMessagesOnStartup(true);
broker.setDataDirectory("target/activemq-data");
TransportConnector tc = broker.addConnector("mqtt://localhost:0");
TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri());
remoteBrokerMQTTPort = tc.getConnectUri().getPort();
return broker;
}
private String getDefaultMQTTTransportConnectorUri(){
return "mqtt://localhost:0?transport.subscriptionStrategyName=mqtt-virtual-topic-subscriptions";
}
private MQTT createMQTTTcpConnection(String clientId, boolean clean, int port) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);