Test code cleanup

This commit is contained in:
Timothy Bish 2014-07-23 15:57:52 -04:00
parent d223ea979f
commit 93f686c5cf
2 changed files with 26 additions and 34 deletions

View File

@ -16,6 +16,16 @@
*/ */
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
@ -25,15 +35,6 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class PahoMQTNioTTest extends PahoMQTTTest { public class PahoMQTNioTTest extends PahoMQTTTest {
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTNioTTest.class); private static final Logger LOG = LoggerFactory.getLogger(PahoMQTNioTTest.class);
@ -43,11 +44,11 @@ public class PahoMQTNioTTest extends PahoMQTTTest {
return "mqtt+nio"; return "mqtt+nio";
} }
@Test(timeout=300000) @Test(timeout = 300000)
public void testLotsOfClients() throws Exception { public void testLotsOfClients() throws Exception {
final int CLIENTS = Integer.getInteger("PahoMQTNioTTest.CLIENTS", 100); final int CLIENTS = Integer.getInteger("PahoMQTNioTTest.CLIENTS", 100);
LOG.info("Using: "+CLIENTS+" clients"); LOG.info("Using: " + CLIENTS + " clients");
addMQTTConnector(); addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start(); brokerService.start();
@ -69,19 +70,20 @@ public class PahoMQTNioTTest extends PahoMQTTTest {
final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS); final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS);
final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS); final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS);
final CountDownLatch sendBarrier = new CountDownLatch(1); final CountDownLatch sendBarrier = new CountDownLatch(1);
for( int i=0; i < CLIENTS; i++ ) { for (int i = 0; i < CLIENTS; i++) {
Thread.sleep(10); Thread.sleep(10);
new Thread(null, null, "client:"+i) { new Thread(null, null, "client:" + i) {
@Override @Override
public void run() { public void run() {
try { try {
MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), Thread.currentThread().getName(), new MemoryPersistence()); MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), Thread.currentThread().getName(),
new MemoryPersistence());
client.connect(); client.connect();
connectedDoneLatch.countDown(); connectedDoneLatch.countDown();
sendBarrier.await(); sendBarrier.await();
for( int i=0; i < 10; i++) { for (int i = 0; i < 10; i++) {
Thread.sleep(1000); Thread.sleep(1000);
client.publish("test", "hello".getBytes(), 1, false); client.publish("test", "hello".getBytes(), 1, false);
} }
client.disconnect(); client.disconnect();
client.close(); client.close();
@ -96,7 +98,7 @@ public class PahoMQTNioTTest extends PahoMQTTTest {
} }
connectedDoneLatch.await(); connectedDoneLatch.await();
assertNull("Async error: "+asyncError.get(),asyncError.get()); assertNull("Async error: " + asyncError.get(), asyncError.get());
sendBarrier.countDown(); sendBarrier.countDown();
LOG.info("All clients connected... waiting to receive sent messages..."); LOG.info("All clients connected... waiting to receive sent messages...");
@ -105,16 +107,13 @@ public class PahoMQTNioTTest extends PahoMQTTTest {
within(30, TimeUnit.SECONDS, new Task() { within(30, TimeUnit.SECONDS, new Task() {
@Override @Override
public void run() throws Exception { public void run() throws Exception {
assertTrue(receiveCounter.get() == CLIENTS*10); assertTrue(receiveCounter.get() == CLIENTS * 10);
} }
}); });
LOG.info("All messages received."); LOG.info("All messages received.");
disconnectDoneLatch.await(); disconnectDoneLatch.await();
assertNull("Async error: "+asyncError.get(),asyncError.get()); assertNull("Async error: " + asyncError.get(), asyncError.get());
} }
} }

View File

@ -16,24 +16,19 @@
*/ */
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
public class PahoMQTTTest extends AbstractMQTTTest { public class PahoMQTTTest extends AbstractMQTTTest {
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
@Test(timeout=300000) @Test(timeout=300000)
public void testSendAndReceiveMQTT() throws Exception { public void testSendAndReceiveMQTT() throws Exception {
addMQTTConnector(); addMQTTConnector();
@ -55,6 +50,4 @@ public class PahoMQTTTest extends AbstractMQTTTest {
client.disconnect(); client.disconnect();
client.close(); client.close();
} }
} }