git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1341521 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2012-05-22 15:22:55 +00:00
parent 94b6b1115e
commit 1e3ea5c9c5
2 changed files with 105 additions and 27 deletions

View File

@ -261,8 +261,6 @@ class MQTTProtocolConverter {
QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException { QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException {
ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString())); ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
if (destination == null) { if (destination == null) {
throw new MQTTProtocolException("Invalid Destination."); throw new MQTTProtocolException("Invalid Destination.");
} }
@ -458,32 +456,16 @@ class MQTTProtocolConverter {
} }
result.topicName(topicName); result.topicName(topicName);
ByteSequence byteSequence = message.getContent();
if (message.isCompressed()) {
Inflater inflater = new Inflater();
inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
byte[] data = new byte[4096];
int read;
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
while ((read = inflater.inflate(data, 0, data.length)) != 0) {
bytesOut.write(data, 0, read);
}
byteSequence = bytesOut.toByteSequence();
}
if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
if (byteSequence.getLength() > 4) {
byte[] content = new byte[byteSequence.getLength() - 4];
System.arraycopy(byteSequence.data, 4, content, 0, content.length);
result.payload(new Buffer(content));
} else {
ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
msg.setReadOnlyBody(true);
String messageText = msg.getText(); String messageText = msg.getText();
if (messageText != null) { if (messageText != null) {
result.payload(new Buffer(msg.getText().getBytes("UTF-8"))); result.payload(new Buffer(messageText.getBytes("UTF-8")));
}
} }
} else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy(); ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
@ -491,8 +473,29 @@ class MQTTProtocolConverter {
byte[] data = new byte[(int) msg.getBodyLength()]; byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data); msg.readBytes(data);
result.payload(new Buffer(data)); result.payload(new Buffer(data));
} else { } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE){
ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
msg.setReadOnlyBody(true);
Map map = msg.getContentMap();
if (map != null){
result.payload(new Buffer(map.toString().getBytes("UTF-8")));
}
}
else {
ByteSequence byteSequence = message.getContent();
if (byteSequence != null && byteSequence.getLength() > 0) { if (byteSequence != null && byteSequence.getLength() > 0) {
if (message.isCompressed()){
Inflater inflater = new Inflater();
inflater.setInput(byteSequence.data,byteSequence.offset,byteSequence.length);
byte[] data = new byte[4096];
int read;
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
while((read = inflater.inflate(data)) != 0){
bytesOut.write(data,0,read);
}
byteSequence = bytesOut.toByteSequence();
}
result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length)); result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
} }
} }
@ -620,9 +623,9 @@ class MQTTProtocolConverter {
} }
private String convertMQTTToActiveMQ(String name) { private String convertMQTTToActiveMQ(String name) {
String result = name.replace('>', '#'); String result = name.replace('#', '>');
result = result.replace('*', '+'); result = result.replace('+', '*');
result = result.replace('.', '/'); result = result.replace('/', '.');
return result; return result;
} }
} }

View File

@ -17,9 +17,13 @@
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -62,6 +66,47 @@ public class MQTTTest {
} }
} }
@Test
public void testSendAndReceiveMQTT() throws Exception {
addMQTTConnector(brokerService);
brokerService.start();
MQTT mqtt = new MQTT();
final BlockingConnection subscribeConnection = mqtt.blockingConnection();
subscribeConnection.connect();
Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE);
Topic[] topics = {topic};
subscribeConnection.subscribe(topics);
final CountDownLatch latch = new CountDownLatch(numberOfMessages);
Thread thread = new Thread(new Runnable() {
public void run() {
for (int i = 0; i < numberOfMessages; i++){
try {
Message message = subscribeConnection.receive();
message.ack();
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
});
thread.start();
BlockingConnection publisherConnection = mqtt.blockingConnection();
publisherConnection.connect();
for (int i = 0; i < numberOfMessages; i++){
String payload = "Message " + i;
publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false);
}
latch.await(10, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
}
@Test @Test
public void testSendAndReceiveAtMostOnce() throws Exception { public void testSendAndReceiveAtMostOnce() throws Exception {
addMQTTConnector(brokerService); addMQTTConnector(brokerService);
@ -172,7 +217,7 @@ public class MQTTTest {
brokerService.start(); brokerService.start();
MQTT mqtt = createMQTTConnection(); MQTT mqtt = createMQTTConnection();
BlockingConnection connection = mqtt.blockingConnection(); BlockingConnection connection = mqtt.blockingConnection();
final String DESTINATION_NAME = "foo"; final String DESTINATION_NAME = "foo.*";
connection.connect(); connection.connect();
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection(); ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection();
@ -183,7 +228,7 @@ public class MQTTTest {
for (int i = 0; i < numberOfMessages; i++) { for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i; String payload = "Test Message: " + i;
connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false); connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
ActiveMQMessage message = (ActiveMQMessage) consumer.receive(); ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
ByteSequence bs = message.getContent(); ByteSequence bs = message.getContent();
assertEquals(payload, new String(bs.data, bs.offset, bs.length)); assertEquals(payload, new String(bs.data, bs.offset, bs.length));
@ -194,6 +239,36 @@ public class MQTTTest {
connection.disconnect(); connection.disconnect();
} }
@Test
public void testSendJMSReceiveMQTT() throws Exception {
addMQTTConnector(brokerService);
brokerService.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive(Short.MAX_VALUE);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory().createConnection();
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic jmsTopic = s.createTopic("foo.far");
MessageProducer producer = s.createProducer(jmsTopic);
Topic[] topics = {new Topic(utf8("foo/far"), QoS.AT_MOST_ONCE)};
connection.subscribe(topics);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "This is Test Message: " + i;
TextMessage sendMessage = s.createTextMessage(payload);
producer.send(sendMessage);
Message message = connection.receive();
message.ack();
assertEquals(payload, new String(message.getPayload()));
}
connection.disconnect();
}
protected void addMQTTConnector(BrokerService brokerService) throws Exception { protected void addMQTTConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("mqtt://localhost:1883"); brokerService.addConnector("mqtt://localhost:1883");
} }