From 3c7c2ed5d34d705651c6797c52029205c43b301a Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Fri, 25 Sep 2015 14:47:23 +0100 Subject: [PATCH] ARTEMIS-233 Remove MQTT Address PreFix for cross protocol support --- .../artemis/core/protocol/mqtt/MQTTUtil.java | 10 +- .../integration/mqtt/imported/MQTTTest.java | 172 +++++++++--------- .../mqtt/imported/MQTTTestSupport.java | 2 + 3 files changed, 95 insertions(+), 89 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index aa2262a07e..2313248ac7 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.server.ServerMessage; @@ -52,8 +53,6 @@ public class MQTTUtil { public static final int MAX_MESSAGE_SIZE = 268435455; - public static final String MQTT_ADDRESS_PREFIX = "$sys.mqtt."; - public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain."; public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level"; @@ -67,16 +66,13 @@ public class MQTTUtil { public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000; public static String convertMQTTAddressFilterToCore(String filter) { - return MQTT_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter); + return swapMQTTAndCoreWildCards(filter); } public static String convertCoreAddressFilterToMQTT(String filter) { if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) { filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length()); } - else if (filter.startsWith(MQTT_ADDRESS_PREFIX)) { - filter = filter.substring(MQTT_ADDRESS_PREFIX.length(), filter.length()); - } return swapMQTTAndCoreWildCards(filter); } @@ -117,6 +113,8 @@ public class MQTTUtil { message.setAddress(address); message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain); message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos); + // For JMS Consumption + message.setType(Message.BYTES_TYPE); return message; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index f15af10bdd..257cf8f2bd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.artemis.tests.integration.mqtt.imported; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; import java.lang.reflect.Field; import java.net.ProtocolException; import java.util.ArrayList; @@ -46,7 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.vertx.java.core.impl.ConcurrentHashSet; -/** +/**QT * MQTT Test imported from ActiveMQ MQTT component. */ public class MQTTTest extends MQTTTestSupport { @@ -1001,91 +1005,93 @@ public class MQTTTest extends MQTTTestSupport { outstanding task to add cross protocol support. This task should rework these tests. The tests are included here and commented out to ensure ActiveMQ and Artemis tests are in sync. */ - // @Test(timeout = 60 * 1000) - // public void testSendMQTTReceiveJMS() throws Exception { - // doTestSendMQTTReceiveJMS("foo.*"); - // } + @Test(timeout = 60 * 1000) + public void testSendMQTTReceiveJMS() throws Exception { + doTestSendMQTTReceiveJMS("foo.*", "foo/bar"); + } - // public void doTestSendMQTTReceiveJMS(String destinationName) throws Exception { - // final MQTTClientProvider provider = getMQTTClientProvider(); - // initializeConnection(provider); - // - // // send retained message - // final String RETAINED = "RETAINED"; - // provider.publish("foo/bah", RETAINED.getBytes(), AT_LEAST_ONCE, true); - // - // ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); - // // MUST set to true to receive retained messages - // activeMQConnection.setUseRetroactiveConsumer(true); - // activeMQConnection.start(); - // Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - // javax.jms.Topic jmsTopic = s.createTopic(destinationName); - // MessageConsumer consumer = s.createConsumer(jmsTopic); - // - // // check whether we received retained message on JMS subscribe - // ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000); - // assertNotNull("Should get retained message", message); - // ByteSequence bs = message.getContent(); - // assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length)); - // assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)); - // - // for (int i = 0; i < NUM_MESSAGES; i++) { - // String payload = "Test Message: " + i; - // provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE); - // message = (ActiveMQMessage) consumer.receive(5000); - // assertNotNull("Should get a message", message); - // bs = message.getContent(); - // assertEquals(payload, new String(bs.data, bs.offset, bs.length)); - // } - // - // activeMQConnection.close(); - // provider.disconnect(); - // } + public void doTestSendMQTTReceiveJMS(String jmsTopicAddress, String mqttAddress) throws Exception { + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); - // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT. - // @Test(timeout = 2 * 60 * 1000) - // public void testSendJMSReceiveMQTT() throws Exception { - // doTestSendJMSReceiveMQTT("foo.far"); - // } + // send retained message + final String address = "jms/queue/" + mqttAddress; + final String RETAINED = "RETAINED"; - // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT. - // public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception { - // final MQTTClientProvider provider = getMQTTClientProvider(); - // initializeConnection(provider); - // - // ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); - // activeMQConnection.setUseRetroactiveConsumer(true); - // activeMQConnection.start(); - // Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - // javax.jms.Topic jmsTopic = s.createTopic(destinationName); - // MessageProducer producer = s.createProducer(jmsTopic); - // - // // send retained message from JMS - // final String RETAINED = "RETAINED"; - // TextMessage sendMessage = s.createTextMessage(RETAINED); - // // mark the message to be retained - // sendMessage.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true); - // // MQTT QoS can be set using MQTTProtocolConverter.QOS_PROPERTY_NAME property - // sendMessage.setIntProperty(MQTTProtocolConverter.QOS_PROPERTY_NAME, 0); - // producer.send(sendMessage); - // - // provider.subscribe("foo/+", AT_MOST_ONCE); - // byte[] message = provider.receive(10000); - // assertNotNull("Should get retained message", message); - // assertEquals(RETAINED, new String(message)); - // - // for (int i = 0; i < NUM_MESSAGES; i++) { - // String payload = "This is Test Message: " + i; - // sendMessage = s.createTextMessage(payload); - // producer.send(sendMessage); - // message = provider.receive(5000); - // assertNotNull("Should get a message", message); - // - // assertEquals(payload, new String(message)); - // } - // provider.disconnect(); - // activeMQConnection.close(); - // } + final byte[] payload = RETAINED.getBytes(); + + Connection connection = cf.createConnection(); + // MUST set to true to receive retained messages + connection.start(); + + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue jmsQueue = s.createQueue(jmsTopicAddress); + MessageConsumer consumer = s.createConsumer(jmsQueue); + + provider.publish(address, RETAINED.getBytes(), AT_LEAST_ONCE, true); + + // check whether we received retained message on JMS subscribe + BytesMessage message = (BytesMessage) consumer.receive(5000); + assertNotNull("Should get retained message", message); + + byte[] b = new byte[8]; + message.readBytes(b); + assertArrayEquals(payload, b); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String p = "Test Message: " + i; + provider.publish(address, p.getBytes(), AT_LEAST_ONCE); + message = (BytesMessage) consumer.receive(5000); + assertNotNull("Should get a message", message); + + byte[] bytePayload = new byte[p.getBytes().length]; + message.readBytes(bytePayload); + assertArrayEquals(payload, b); + } + + connection.close(); + provider.disconnect(); + } + +// @Test(timeout = 2 * 60 * 1000) +// public void testSendJMSReceiveMQTT() throws Exception { +// doTestSendJMSReceiveMQTT("foo.far"); +// } +// +// public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception { +// final MQTTClientProvider provider = getMQTTClientProvider(); +// initializeConnection(provider); +// +// Connection connection = cf.createConnection(); +// connection.start(); +// +// Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); +// javax.jms.Queue queue = s.createQueue(destinationName); +// MessageProducer producer = s.createProducer(queue); +// +// // send retained message from JMS +// final String RETAINED = "RETAINED"; +// TextMessage sendMessage = s.createTextMessage(RETAINED); +// sendMessage.setIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY, 0); +// producer.send(sendMessage); +// +// provider.subscribe("jms/queue/foo/+", AT_MOST_ONCE); +// byte[] message = provider.receive(10000); +// assertNotNull("Should get retained message", message); +// assertEquals(RETAINED, new String(message)); +// +// for (int i = 0; i < NUM_MESSAGES; i++) { +// String payload = "This is Test Message: " + i; +// sendMessage = s.createTextMessage(payload); +// producer.send(sendMessage); +// message = provider.receive(5000); +// assertNotNull("Should get a message", message); +// +// assertEquals(payload, new String(message)); +// } +// provider.disconnect(); +// connection.close(); +// } @Test(timeout = 60 * 1000) public void testPingKeepsInactivityMonitorAlive() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index 61fcec0339..73489af0f1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -125,6 +125,8 @@ public class MQTTTestSupport extends ActiveMQTestBase { addMQTTConnector(); AddressSettings addressSettings = new AddressSettings(); addressSettings.setMaxSizeBytes(999999999); + addressSettings.setAutoCreateJmsQueues(true); + server.getAddressSettingsRepository().addMatch("#", addressSettings); server.start(); server.waitForActivation(10, TimeUnit.SECONDS);