ARTEMIS-233 Remove MQTT Address PreFix for cross protocol support

This commit is contained in:
Martyn Taylor 2015-09-25 14:47:23 +01:00
parent a313558797
commit 3c7c2ed5d3
3 changed files with 95 additions and 89 deletions

View File

@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.server.ServerMessage;
@ -52,8 +53,6 @@ public class MQTTUtil {
public static final int MAX_MESSAGE_SIZE = 268435455;
public static final String MQTT_ADDRESS_PREFIX = "$sys.mqtt.";
public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain.";
public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level";
@ -67,16 +66,13 @@ public class MQTTUtil {
public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000;
public static String convertMQTTAddressFilterToCore(String filter) {
return MQTT_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter);
return swapMQTTAndCoreWildCards(filter);
}
public static String convertCoreAddressFilterToMQTT(String filter) {
if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
}
else if (filter.startsWith(MQTT_ADDRESS_PREFIX)) {
filter = filter.substring(MQTT_ADDRESS_PREFIX.length(), filter.length());
}
return swapMQTTAndCoreWildCards(filter);
}
@ -117,6 +113,8 @@ public class MQTTUtil {
message.setAddress(address);
message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
// For JMS Consumption
message.setType(Message.BYTES_TYPE);
return message;
}

View File

@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import java.lang.reflect.Field;
import java.net.ProtocolException;
import java.util.ArrayList;
@ -46,7 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vertx.java.core.impl.ConcurrentHashSet;
/**
/**QT
* MQTT Test imported from ActiveMQ MQTT component.
*/
public class MQTTTest extends MQTTTestSupport {
@ -1001,91 +1005,93 @@ public class MQTTTest extends MQTTTestSupport {
outstanding task to add cross protocol support. This task should rework these tests. The tests are included here
and commented out to ensure ActiveMQ and Artemis tests are in sync. */
// @Test(timeout = 60 * 1000)
// public void testSendMQTTReceiveJMS() throws Exception {
// doTestSendMQTTReceiveJMS("foo.*");
// }
@Test(timeout = 60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception {
doTestSendMQTTReceiveJMS("foo.*", "foo/bar");
}
// public void doTestSendMQTTReceiveJMS(String destinationName) throws Exception {
// final MQTTClientProvider provider = getMQTTClientProvider();
// initializeConnection(provider);
//
// // send retained message
// final String RETAINED = "RETAINED";
// provider.publish("foo/bah", RETAINED.getBytes(), AT_LEAST_ONCE, true);
//
// ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
// // MUST set to true to receive retained messages
// activeMQConnection.setUseRetroactiveConsumer(true);
// activeMQConnection.start();
// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// javax.jms.Topic jmsTopic = s.createTopic(destinationName);
// MessageConsumer consumer = s.createConsumer(jmsTopic);
//
// // check whether we received retained message on JMS subscribe
// ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
// assertNotNull("Should get retained message", message);
// ByteSequence bs = message.getContent();
// assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
// assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
//
// for (int i = 0; i < NUM_MESSAGES; i++) {
// String payload = "Test Message: " + i;
// provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
// message = (ActiveMQMessage) consumer.receive(5000);
// assertNotNull("Should get a message", message);
// bs = message.getContent();
// assertEquals(payload, new String(bs.data, bs.offset, bs.length));
// }
//
// activeMQConnection.close();
// provider.disconnect();
// }
public void doTestSendMQTTReceiveJMS(String jmsTopicAddress, String mqttAddress) throws Exception {
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
// TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
// @Test(timeout = 2 * 60 * 1000)
// public void testSendJMSReceiveMQTT() throws Exception {
// doTestSendJMSReceiveMQTT("foo.far");
// }
// send retained message
final String address = "jms/queue/" + mqttAddress;
final String RETAINED = "RETAINED";
// TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
// public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
// final MQTTClientProvider provider = getMQTTClientProvider();
// initializeConnection(provider);
//
// ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
// activeMQConnection.setUseRetroactiveConsumer(true);
// activeMQConnection.start();
// Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// javax.jms.Topic jmsTopic = s.createTopic(destinationName);
// MessageProducer producer = s.createProducer(jmsTopic);
//
// // send retained message from JMS
// final String RETAINED = "RETAINED";
// TextMessage sendMessage = s.createTextMessage(RETAINED);
// // mark the message to be retained
// sendMessage.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
// // MQTT QoS can be set using MQTTProtocolConverter.QOS_PROPERTY_NAME property
// sendMessage.setIntProperty(MQTTProtocolConverter.QOS_PROPERTY_NAME, 0);
// producer.send(sendMessage);
//
// provider.subscribe("foo/+", AT_MOST_ONCE);
// byte[] message = provider.receive(10000);
// assertNotNull("Should get retained message", message);
// assertEquals(RETAINED, new String(message));
//
// for (int i = 0; i < NUM_MESSAGES; i++) {
// String payload = "This is Test Message: " + i;
// sendMessage = s.createTextMessage(payload);
// producer.send(sendMessage);
// message = provider.receive(5000);
// assertNotNull("Should get a message", message);
//
// assertEquals(payload, new String(message));
// }
// provider.disconnect();
// activeMQConnection.close();
// }
final byte[] payload = RETAINED.getBytes();
Connection connection = cf.createConnection();
// MUST set to true to receive retained messages
connection.start();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = s.createQueue(jmsTopicAddress);
MessageConsumer consumer = s.createConsumer(jmsQueue);
provider.publish(address, RETAINED.getBytes(), AT_LEAST_ONCE, true);
// check whether we received retained message on JMS subscribe
BytesMessage message = (BytesMessage) consumer.receive(5000);
assertNotNull("Should get retained message", message);
byte[] b = new byte[8];
message.readBytes(b);
assertArrayEquals(payload, b);
for (int i = 0; i < NUM_MESSAGES; i++) {
String p = "Test Message: " + i;
provider.publish(address, p.getBytes(), AT_LEAST_ONCE);
message = (BytesMessage) consumer.receive(5000);
assertNotNull("Should get a message", message);
byte[] bytePayload = new byte[p.getBytes().length];
message.readBytes(bytePayload);
assertArrayEquals(payload, b);
}
connection.close();
provider.disconnect();
}
// @Test(timeout = 2 * 60 * 1000)
// public void testSendJMSReceiveMQTT() throws Exception {
// doTestSendJMSReceiveMQTT("foo.far");
// }
//
// public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
// final MQTTClientProvider provider = getMQTTClientProvider();
// initializeConnection(provider);
//
// Connection connection = cf.createConnection();
// connection.start();
//
// Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// javax.jms.Queue queue = s.createQueue(destinationName);
// MessageProducer producer = s.createProducer(queue);
//
// // send retained message from JMS
// final String RETAINED = "RETAINED";
// TextMessage sendMessage = s.createTextMessage(RETAINED);
// sendMessage.setIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY, 0);
// producer.send(sendMessage);
//
// provider.subscribe("jms/queue/foo/+", AT_MOST_ONCE);
// byte[] message = provider.receive(10000);
// assertNotNull("Should get retained message", message);
// assertEquals(RETAINED, new String(message));
//
// for (int i = 0; i < NUM_MESSAGES; i++) {
// String payload = "This is Test Message: " + i;
// sendMessage = s.createTextMessage(payload);
// producer.send(sendMessage);
// message = provider.receive(5000);
// assertNotNull("Should get a message", message);
//
// assertEquals(payload, new String(message));
// }
// provider.disconnect();
// connection.close();
// }
@Test(timeout = 60 * 1000)
public void testPingKeepsInactivityMonitorAlive() throws Exception {

View File

@ -125,6 +125,8 @@ public class MQTTTestSupport extends ActiveMQTestBase {
addMQTTConnector();
AddressSettings addressSettings = new AddressSettings();
addressSettings.setMaxSizeBytes(999999999);
addressSettings.setAutoCreateJmsQueues(true);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
server.start();
server.waitForActivation(10, TimeUnit.SECONDS);