diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java index 3c9ab180c9..589f3bb1c5 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java @@ -16,6 +16,16 @@ */ package org.apache.activemq.transport.mqtt; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.TransportConnector; @@ -25,15 +35,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - public class PahoMQTNioTTest extends PahoMQTTTest { private static final Logger LOG = LoggerFactory.getLogger(PahoMQTNioTTest.class); @@ -43,11 +44,11 @@ public class PahoMQTNioTTest extends PahoMQTTTest { return "mqtt+nio"; } - @Test(timeout=300000) + @Test(timeout = 300000) public void testLotsOfClients() throws Exception { final int CLIENTS = Integer.getInteger("PahoMQTNioTTest.CLIENTS", 100); - LOG.info("Using: "+CLIENTS+" clients"); + LOG.info("Using: " + CLIENTS + " clients"); addMQTTConnector(); TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); brokerService.start(); @@ -69,19 +70,20 @@ public class PahoMQTNioTTest extends PahoMQTTTest { final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS); final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS); final CountDownLatch sendBarrier = new CountDownLatch(1); - for( int i=0; i < CLIENTS; i++ ) { + for (int i = 0; i < CLIENTS; i++) { Thread.sleep(10); - new Thread(null, null, "client:"+i) { + new Thread(null, null, "client:" + i) { @Override public void run() { try { - MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), Thread.currentThread().getName(), new MemoryPersistence()); + MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), Thread.currentThread().getName(), + new MemoryPersistence()); client.connect(); connectedDoneLatch.countDown(); sendBarrier.await(); - for( int i=0; i < 10; i++) { - Thread.sleep(1000); - client.publish("test", "hello".getBytes(), 1, false); + for (int i = 0; i < 10; i++) { + Thread.sleep(1000); + client.publish("test", "hello".getBytes(), 1, false); } client.disconnect(); client.close(); @@ -96,7 +98,7 @@ public class PahoMQTNioTTest extends PahoMQTTTest { } connectedDoneLatch.await(); - assertNull("Async error: "+asyncError.get(),asyncError.get()); + assertNull("Async error: " + asyncError.get(), asyncError.get()); sendBarrier.countDown(); LOG.info("All clients connected... waiting to receive sent messages..."); @@ -105,16 +107,13 @@ public class PahoMQTNioTTest extends PahoMQTTTest { within(30, TimeUnit.SECONDS, new Task() { @Override public void run() throws Exception { - assertTrue(receiveCounter.get() == CLIENTS*10); + assertTrue(receiveCounter.get() == CLIENTS * 10); } }); LOG.info("All messages received."); disconnectDoneLatch.await(); - assertNull("Async error: "+asyncError.get(),asyncError.get()); - + assertNull("Async error: " + asyncError.get(), asyncError.get()); } - - } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java index a8dfd03373..3af4b53ee6 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java @@ -16,24 +16,19 @@ */ package org.apache.activemq.transport.mqtt; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.TransportConnector; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; public class PahoMQTTTest extends AbstractMQTTTest { - private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class); - - @Test(timeout=300000) public void testSendAndReceiveMQTT() throws Exception { addMQTTConnector(); @@ -55,6 +50,4 @@ public class PahoMQTTTest extends AbstractMQTTTest { client.disconnect(); client.close(); } - - } \ No newline at end of file