From 0d8a5658369da566e9f2e3b2f47a9ae4cdf6b24d Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Wed, 25 May 2016 17:57:38 +0100 Subject: [PATCH] Artemis-233 Support JMS BytesMessage -> MQTT --- .../protocol/mqtt/MQTTPublishManager.java | 6 +- .../integration/mqtt/imported/MQTTTest.java | 165 ++++++++---------- 2 files changed, 82 insertions(+), 89 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 19bbff89ff..93d0bd267d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -223,7 +223,11 @@ public class MQTTPublishManager { private int decideQoS(ServerMessage message, ServerConsumer consumer) { int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID()); - int qos = message.getIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY); + + int qos = 2; + if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) { + qos = message.getIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY); + } /* Subscription QoS is the maximum QoS the client is willing to receive for this subscription. If the message QoS is less than the subscription QoS then use it, otherwise use the subscription qos). */ diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 257cf8f2bd..b305c80a70 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -18,7 +18,9 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported; import javax.jms.BytesMessage; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Session; import java.lang.reflect.Field; import java.net.ProtocolException; @@ -50,7 +52,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.vertx.java.core.impl.ConcurrentHashSet; -/**QT +/** + * QT * MQTT Test imported from ActiveMQ MQTT component. */ public class MQTTTest extends MQTTTestSupport { @@ -1001,10 +1004,6 @@ public class MQTTTest extends MQTTTestSupport { notClean.disconnect(); } - /* TODO These Cross protocol tests were imported from ActiveMQ and need reworking to apply to Artemis. There is an - outstanding task to add cross protocol support. This task should rework these tests. The tests are included here - and commented out to ensure ActiveMQ and Artemis tests are in sync. */ - @Test(timeout = 60 * 1000) public void testSendMQTTReceiveJMS() throws Exception { doTestSendMQTTReceiveJMS("foo.*", "foo/bar"); @@ -1053,45 +1052,36 @@ public class MQTTTest extends MQTTTestSupport { provider.disconnect(); } -// @Test(timeout = 2 * 60 * 1000) -// public void testSendJMSReceiveMQTT() throws Exception { -// doTestSendJMSReceiveMQTT("foo.far"); -// } -// -// public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception { -// final MQTTClientProvider provider = getMQTTClientProvider(); -// initializeConnection(provider); -// -// Connection connection = cf.createConnection(); -// connection.start(); -// -// Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); -// javax.jms.Queue queue = s.createQueue(destinationName); -// MessageProducer producer = s.createProducer(queue); -// -// // send retained message from JMS -// final String RETAINED = "RETAINED"; -// TextMessage sendMessage = s.createTextMessage(RETAINED); -// sendMessage.setIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY, 0); -// producer.send(sendMessage); -// -// provider.subscribe("jms/queue/foo/+", AT_MOST_ONCE); -// byte[] message = provider.receive(10000); -// assertNotNull("Should get retained message", message); -// assertEquals(RETAINED, new String(message)); -// -// for (int i = 0; i < NUM_MESSAGES; i++) { -// String payload = "This is Test Message: " + i; -// sendMessage = s.createTextMessage(payload); -// producer.send(sendMessage); -// message = provider.receive(5000); -// assertNotNull("Should get a message", message); -// -// assertEquals(payload, new String(message)); -// } -// provider.disconnect(); -// connection.close(); -// } + @Test(timeout = 2 * 60 * 1000) + public void testSendJMSReceiveMQTT() throws Exception { + doTestSendJMSReceiveMQTT("foo.far"); + } + + public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception { + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + provider.subscribe("jms/queue/foo/+", AT_MOST_ONCE); + + Connection connection = cf.createConnection(); + connection.start(); + + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = s.createQueue(destinationName); + MessageProducer producer = s.createProducer(queue); + + // send retained message from JMS + final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + BytesMessage bytesMessage = s.createBytesMessage(); + bytesMessage.writeBytes(bytes); + producer.send(bytesMessage); + + byte[] message = provider.receive(10000); + assertNotNull("Should get retained message", message); + assertArrayEquals(bytes, message); + + provider.disconnect(); + connection.close(); + } @Test(timeout = 60 * 1000) public void testPingKeepsInactivityMonitorAlive() throws Exception { @@ -1237,52 +1227,51 @@ public class MQTTTest extends MQTTTestSupport { connection2.disconnect(); } - // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT. - // @Test(timeout = 30 * 10000) - // public void testJmsMapping() throws Exception { - // doTestJmsMapping("test.foo"); - // } + @Test(timeout = 30 * 10000) + public void testJmsMapping() throws Exception { + doTestJmsMapping("test.foo"); + } // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT. - // public void doTestJmsMapping(String destinationName) throws Exception { - // // start up jms consumer - // Connection jmsConn = cf.createConnection(); - // Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE); - // Destination dest = session.createTopic(destinationName); - // MessageConsumer consumer = session.createConsumer(dest); - // jmsConn.start(); - // - // // set up mqtt producer - // MQTT mqtt = createMQTTConnection(); - // mqtt.setClientId("foo3"); - // mqtt.setKeepAlive((short) 2); - // final BlockingConnection connection = mqtt.blockingConnection(); - // connection.connect(); - // - // int messagesToSend = 5; - // - // // publish - // for (int i = 0; i < messagesToSend; ++i) { - // connection.publish("test/foo", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false); - // } - // - // connection.disconnect(); - // - // for (int i = 0; i < messagesToSend; i++) { - // - // javax.jms.Message message = consumer.receive(2 * 1000); - // assertNotNull(message); - // assertTrue(message instanceof BytesMessage); - // BytesMessage bytesMessage = (BytesMessage) message; - // - // int length = (int) bytesMessage.getBodyLength(); - // byte[] buffer = new byte[length]; - // bytesMessage.readBytes(buffer); - // assertEquals("hello world", new String(buffer)); - // } - // - // jmsConn.close(); - // } + public void doTestJmsMapping(String destinationName) throws Exception { + // start up jms consumer + Connection jmsConn = cf.createConnection(); + Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(dest); + jmsConn.start(); + + // set up mqtt producer + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("foo3"); + mqtt.setKeepAlive((short) 2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + int messagesToSend = 5; + + // publish + for (int i = 0; i < messagesToSend; ++i) { + connection.publish("jms/queue/test/foo", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false); + } + + connection.disconnect(); + + for (int i = 0; i < messagesToSend; i++) { + + javax.jms.Message message = consumer.receive(2 * 1000); + assertNotNull(message); + assertTrue(message instanceof BytesMessage); + BytesMessage bytesMessage = (BytesMessage) message; + + int length = (int) bytesMessage.getBodyLength(); + byte[] buffer = new byte[length]; + bytesMessage.readBytes(buffer); + assertEquals("hello world", new String(buffer)); + } + + jmsConn.close(); + } @Test(timeout = 30 * 10000) public void testSubscribeMultipleTopics() throws Exception {