diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java index b9760567a8..13a0e75d1d 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java @@ -97,4 +97,31 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport { provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort()); } + protected static interface Task { + public void run() throws Exception; + } + + protected void within(int time, TimeUnit unit, Task task) throws InterruptedException { + long timeMS = unit.toMillis(time); + long deadline = System.currentTimeMillis() + timeMS; + while (true) { + try { + task.run(); + return; + } catch (Throwable e) { + long remaining = deadline - System.currentTimeMillis(); + if( remaining <=0 ) { + if( e instanceof RuntimeException ) { + throw (RuntimeException)e; + } + if( e instanceof Error ) { + throw (Error)e; + } + throw new RuntimeException(e); + } + Thread.sleep(Math.min(timeMS/10, remaining)); + } + } + } + } \ No newline at end of file diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java new file mode 100644 index 0000000000..b657c0ac1d --- /dev/null +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.TransportConnector; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class PahoMQTNioTTest extends PahoMQTTTest { + + private static final Logger LOG = LoggerFactory.getLogger(PahoMQTNioTTest.class); + + @Override + protected String getProtocolScheme() { + return "mqtt+nio"; + } + + @Test(timeout=300000) + public void testLotsOfClients() throws Exception { + + final int CLIENTS = 100; + addMQTTConnector(); + TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection(); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = s.createConsumer(s.createTopic("test")); + + final AtomicInteger receiveCounter = new AtomicInteger(); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + receiveCounter.incrementAndGet(); + } + }); + + final AtomicReference asyncError = new AtomicReference(); + final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS); + final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS); + final CountDownLatch sendBarrier = new CountDownLatch(1); + for( int i=0; i < CLIENTS; i++ ) { + Thread.sleep(10); + new Thread(null, null, "client:"+i) { + @Override + public void run() { + try { + MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), Thread.currentThread().getName(), new MemoryPersistence()); + client.connect(); + connectedDoneLatch.countDown(); + sendBarrier.await(); + client.publish("test", "hello".getBytes(), 1, false); + client.disconnect(); + client.close(); + } catch (Throwable e) { + e.printStackTrace(); + asyncError.set(e); + } finally { + disconnectDoneLatch.countDown(); + } + } + }.start(); + } + + connectedDoneLatch.await(); + assertNull("Async error: "+asyncError.get(),asyncError.get()); + System.out.println("All clients connected..."); + sendBarrier.countDown(); + + // We should eventually get all the messages. + within(30, TimeUnit.SECONDS, new Task() { + @Override + public void run() throws Exception { + assertTrue(receiveCounter.get() == CLIENTS); + } + }); + + disconnectDoneLatch.await(); + assertNull("Async error: "+asyncError.get(),asyncError.get()); + + } + + +} \ No newline at end of file diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java index 5251567938..a8dfd03373 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java @@ -20,6 +20,7 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.TransportConnector; import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,7 @@ public class PahoMQTTTest extends AbstractMQTTTest { Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = s.createConsumer(s.createTopic("test")); - MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), "clientid"); + MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), "clientid", new MemoryPersistence()); client.connect(); client.publish("test", "hello".getBytes(), 1, false);