diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java index c924bf96c6..e14c135571 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTConnectTest.java @@ -22,11 +22,16 @@ import java.io.IOException; import java.net.Socket; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLSocketFactory; import org.apache.activemq.util.Wait; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; @@ -77,6 +82,37 @@ public class MQTTConnectTest extends MQTTTestSupport { return "transport.connectAttemptTimeout=1000"; } + @Test(timeout = 90000) + public void testParallelConnectPlain() throws Exception { + final int THREAD_COUNT = 16; + final int CONNECTION_COUNT = 100; + ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT); + final AtomicInteger clientIdGemeratpr = new AtomicInteger(); + for (int i = 0; i < CONNECTION_COUNT; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + + try { + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("client-" + clientIdGemeratpr.incrementAndGet()); + mqtt.setCleanSession(true); + + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.disconnect(); + } catch (Exception e) { + LOG.error("unexpected exception on connect/disconnect", e); + exceptions.add(e); + } + } + }); + } + + executorService.shutdown(); + assertTrue("executor done on time", executorService.awaitTermination(60, TimeUnit.SECONDS)); + } + @Test(timeout = 60 * 1000) public void testInactivityMonitor() throws Exception {