This closes #543

This commit is contained in:
Clebert Suconic 2016-05-25 15:37:20 -04:00
commit fdaf9b22f5
3 changed files with 81 additions and 4 deletions

View File

@ -17,9 +17,13 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.io.UnsupportedEncodingException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.EmptyByteBuf;
import io.netty.handler.codec.mqtt.MqttMessageType;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
@ -216,8 +220,24 @@ public class MQTTPublishManager {
private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString());
ByteBuf payload = message.getBodyBufferDuplicate().byteBuf();
ByteBuf payload;
switch (message.getType()) {
case Message.TEXT_TYPE:
try {
SimpleString text = message.getBodyBuffer().readNullableSimpleString();
byte[] stringPayload = text.toString().getBytes("UTF-8");
payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
payload.writeBytes(stringPayload);
break;
}
catch (UnsupportedEncodingException e) {
e.printStackTrace();
// Do nothing default to sending raw bytes.
}
default:
payload = message.getBodyBufferDuplicate().byteBuf();
break;
}
session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
}

View File

@ -33,9 +33,11 @@ import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.FuseMQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Test;
@ -145,6 +147,59 @@ public class StompTest extends StompTestBase {
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
@Test
public void sendSTOMPReceiveMQTT() throws Exception {
String address = "myTestAddress";
// Set up MQTT Subscription
MQTTClientProvider clientProvider = new FuseMQTTClientProvider();
clientProvider.connect("tcp://localhost:61616");
clientProvider.subscribe(address, 0);
String stompPayload = "This is a test message";
// Set up STOMP connection and send STOMP Message
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
sendFrame(frame);
frame = "SEND\n" + "destination:" + address + "\n\n" + stompPayload + Stomp.NULL;
sendFrame(frame);
// Recieve MQTT Message
byte[] mqttPayload = clientProvider.receive(10000);
clientProvider.disconnect();
assertEquals(stompPayload, new String(mqttPayload, "UTF-8"));
clientProvider.disconnect();
}
@Test
public void sendMQTTReceiveSTOMP() throws Exception {
String address = "myTestAddress";
String payload = "This is a test message";
server.getActiveMQServer().createQueue(new SimpleString(address), new SimpleString(address), null, false, false);
// Set up STOMP subscription
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
sendFrame(frame);
frame = "SUBSCRIBE\n" + "destination:" + address + "\n" + "ack:auto\n\n" + Stomp.NULL;
sendFrame(frame);
receiveFrame(1000);
// Send MQTT Message
MQTTClientProvider clientProvider = new FuseMQTTClientProvider();
clientProvider.connect("tcp://localhost:61616");
clientProvider.publish(address, payload.getBytes(), 0);
clientProvider.disconnect();
// Receive STOMP Message
frame = receiveFrame(1000);
assertTrue(frame.contains(payload));
}
@Test
public void testSendMessageToNonExistentQueue() throws Exception {
String nonExistentQueue = RandomUtil.randomString();

View File

@ -183,8 +183,10 @@ public abstract class StompTestBase extends ActiveMQTestBase {
params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
TransportConfiguration allTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName());
Configuration config = createBasicConfig().setPersistenceEnabled(false).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
config.addAcceptorConfiguration(allTransport);
ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));