Artemis-233 Support JMS BytesMessage -> MQTT

This commit is contained in:
Martyn Taylor 2016-05-25 17:57:38 +01:00
parent 3c7c2ed5d3
commit 0d8a565836
2 changed files with 82 additions and 89 deletions

View File

@ -223,7 +223,11 @@ public class MQTTPublishManager {
private int decideQoS(ServerMessage message, ServerConsumer consumer) {
int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
int qos = message.getIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY);
int qos = 2;
if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) {
qos = message.getIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY);
}
/* Subscription QoS is the maximum QoS the client is willing to receive for this subscription. If the message QoS
is less than the subscription QoS then use it, otherwise use the subscription qos). */

View File

@ -18,7 +18,9 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.lang.reflect.Field;
import java.net.ProtocolException;
@ -50,7 +52,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vertx.java.core.impl.ConcurrentHashSet;
/**QT
/**
* QT
* MQTT Test imported from ActiveMQ MQTT component.
*/
public class MQTTTest extends MQTTTestSupport {
@ -1001,10 +1004,6 @@ public class MQTTTest extends MQTTTestSupport {
notClean.disconnect();
}
/* TODO These Cross protocol tests were imported from ActiveMQ and need reworking to apply to Artemis. There is an
outstanding task to add cross protocol support. This task should rework these tests. The tests are included here
and commented out to ensure ActiveMQ and Artemis tests are in sync. */
@Test(timeout = 60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception {
doTestSendMQTTReceiveJMS("foo.*", "foo/bar");
@ -1053,45 +1052,36 @@ public class MQTTTest extends MQTTTestSupport {
provider.disconnect();
}
// @Test(timeout = 2 * 60 * 1000)
// public void testSendJMSReceiveMQTT() throws Exception {
// doTestSendJMSReceiveMQTT("foo.far");
// }
//
// public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
// final MQTTClientProvider provider = getMQTTClientProvider();
// initializeConnection(provider);
//
// Connection connection = cf.createConnection();
// connection.start();
//
// Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// javax.jms.Queue queue = s.createQueue(destinationName);
// MessageProducer producer = s.createProducer(queue);
//
// // send retained message from JMS
// final String RETAINED = "RETAINED";
// TextMessage sendMessage = s.createTextMessage(RETAINED);
// sendMessage.setIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY, 0);
// producer.send(sendMessage);
//
// provider.subscribe("jms/queue/foo/+", AT_MOST_ONCE);
// byte[] message = provider.receive(10000);
// assertNotNull("Should get retained message", message);
// assertEquals(RETAINED, new String(message));
//
// for (int i = 0; i < NUM_MESSAGES; i++) {
// String payload = "This is Test Message: " + i;
// sendMessage = s.createTextMessage(payload);
// producer.send(sendMessage);
// message = provider.receive(5000);
// assertNotNull("Should get a message", message);
//
// assertEquals(payload, new String(message));
// }
// provider.disconnect();
// connection.close();
// }
@Test(timeout = 2 * 60 * 1000)
public void testSendJMSReceiveMQTT() throws Exception {
doTestSendJMSReceiveMQTT("foo.far");
}
public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
provider.subscribe("jms/queue/foo/+", AT_MOST_ONCE);
Connection connection = cf.createConnection();
connection.start();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = s.createQueue(destinationName);
MessageProducer producer = s.createProducer(queue);
// send retained message from JMS
final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
BytesMessage bytesMessage = s.createBytesMessage();
bytesMessage.writeBytes(bytes);
producer.send(bytesMessage);
byte[] message = provider.receive(10000);
assertNotNull("Should get retained message", message);
assertArrayEquals(bytes, message);
provider.disconnect();
connection.close();
}
@Test(timeout = 60 * 1000)
public void testPingKeepsInactivityMonitorAlive() throws Exception {
@ -1237,52 +1227,51 @@ public class MQTTTest extends MQTTTestSupport {
connection2.disconnect();
}
// TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
// @Test(timeout = 30 * 10000)
// public void testJmsMapping() throws Exception {
// doTestJmsMapping("test.foo");
// }
@Test(timeout = 30 * 10000)
public void testJmsMapping() throws Exception {
doTestJmsMapping("test.foo");
}
// TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
// public void doTestJmsMapping(String destinationName) throws Exception {
// // start up jms consumer
// Connection jmsConn = cf.createConnection();
// Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Destination dest = session.createTopic(destinationName);
// MessageConsumer consumer = session.createConsumer(dest);
// jmsConn.start();
//
// // set up mqtt producer
// MQTT mqtt = createMQTTConnection();
// mqtt.setClientId("foo3");
// mqtt.setKeepAlive((short) 2);
// final BlockingConnection connection = mqtt.blockingConnection();
// connection.connect();
//
// int messagesToSend = 5;
//
// // publish
// for (int i = 0; i < messagesToSend; ++i) {
// connection.publish("test/foo", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false);
// }
//
// connection.disconnect();
//
// for (int i = 0; i < messagesToSend; i++) {
//
// javax.jms.Message message = consumer.receive(2 * 1000);
// assertNotNull(message);
// assertTrue(message instanceof BytesMessage);
// BytesMessage bytesMessage = (BytesMessage) message;
//
// int length = (int) bytesMessage.getBodyLength();
// byte[] buffer = new byte[length];
// bytesMessage.readBytes(buffer);
// assertEquals("hello world", new String(buffer));
// }
//
// jmsConn.close();
// }
public void doTestJmsMapping(String destinationName) throws Exception {
// start up jms consumer
Connection jmsConn = cf.createConnection();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createQueue(destinationName);
MessageConsumer consumer = session.createConsumer(dest);
jmsConn.start();
// set up mqtt producer
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo3");
mqtt.setKeepAlive((short) 2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
int messagesToSend = 5;
// publish
for (int i = 0; i < messagesToSend; ++i) {
connection.publish("jms/queue/test/foo", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false);
}
connection.disconnect();
for (int i = 0; i < messagesToSend; i++) {
javax.jms.Message message = consumer.receive(2 * 1000);
assertNotNull(message);
assertTrue(message instanceof BytesMessage);
BytesMessage bytesMessage = (BytesMessage) message;
int length = (int) bytesMessage.getBodyLength();
byte[] buffer = new byte[length];
bytesMessage.readBytes(buffer);
assertEquals("hello world", new String(buffer));
}
jmsConn.close();
}
@Test(timeout = 30 * 10000)
public void testSubscribeMultipleTopics() throws Exception {