diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java index 550ca02053..ac6cfc5ed5 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java @@ -41,6 +41,7 @@ import static org.junit.Assert.assertArrayEquals; public abstract class AbstractMQTTTest extends AutoFailTestSupport { protected TransportConnector mqttConnector; + protected TransportConnector openwireConnector; public static final int AT_MOST_ONCE =0; public static final int AT_LEAST_ONCE = 1; @@ -382,6 +383,10 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport { mqttConnector = brokerService.addConnector(getProtocolScheme()+"://localhost:0?" + config); } + protected void addOpenwireConnector() throws Exception { + openwireConnector = brokerService.addConnector("tcp://localhost:0"); + } + protected void initializeConnection(MQTTClientProvider provider) throws Exception { provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort()); } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index e32568bea1..221abf31f4 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.mqtt; import java.util.concurrent.TimeUnit; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; @@ -30,6 +31,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.*; + public class MQTTTest extends AbstractMQTTTest { private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class); @@ -76,10 +79,58 @@ public class MQTTTest extends AbstractMQTTTest { connection.disconnect(); } - @Test(timeout=300000) + @Test(timeout = 300000) + public void testJmsMapping() throws Exception { + addMQTTConnector(); + addOpenwireConnector(); + brokerService.start(); + + // start up jms consumer + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:" + openwireConnector.getConnectUri().getPort()); + Connection jmsConn = factory.createConnection(); + Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = session.createTopic("test.foo"); + MessageConsumer consumer = session.createConsumer(dest); + jmsConn.start(); + + // set up mqtt producer + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("foo3"); + mqtt.setKeepAlive((short)2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + int messagesToSend = 5; + + // publish + for (int i = 0; i < messagesToSend; ++i) { + connection.publish("test/foo", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false); + } + + connection.disconnect(); + + for (int i = 0; i < messagesToSend; i++) { + + javax.jms.Message message = consumer.receive(2 * 1000); + assertNotNull(message); + assertTrue(message instanceof BytesMessage); + BytesMessage bytesMessage = (BytesMessage) message; + + int length = (int) bytesMessage.getBodyLength(); + byte[] buffer = new byte[length]; + bytesMessage.readBytes(buffer); + assertEquals("hello world", new String(buffer)); + } + + jmsConn.close(); + + } + + @Test(timeout = 300000) public void testSubscribeMultipleTopics() throws Exception { + byte[] payload = new byte[1024 * 32]; - for (int i = 0; i < payload.length; i++){ + for (int i = 0; i < payload.length; i++) { payload[i] = '2'; } @@ -108,7 +159,7 @@ public class MQTTTest extends AbstractMQTTTest { payload = message.getPayload(); String messageContent = new String(payload); LOG.info("Received message from topic: " + message.getTopic() + - " Message content: " + messageContent); + " Message content: " + messageContent); message.ack(); }