Add STOMP-MQTT cross protocol tests

This commit is contained in:
Martyn Taylor 2016-05-25 20:32:03 +01:00 committed by Clebert Suconic
parent e453aae5c9
commit 8507a5c6f0
2 changed files with 59 additions and 2 deletions

View File

@ -33,9 +33,11 @@ import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.FuseMQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Test;
@ -145,6 +147,59 @@ public class StompTest extends StompTestBase {
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
@Test
public void sendSTOMPReceiveMQTT() throws Exception {
String address = "myTestAddress";
// Set up MQTT Subscription
MQTTClientProvider clientProvider = new FuseMQTTClientProvider();
clientProvider.connect("tcp://localhost:61616");
clientProvider.subscribe(address, 0);
String stompPayload = "This is a test message";
// Set up STOMP connection and send STOMP Message
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
sendFrame(frame);
frame = "SEND\n" + "destination:" + address + "\n\n" + stompPayload + Stomp.NULL;
sendFrame(frame);
// Recieve MQTT Message
byte[] mqttPayload = clientProvider.receive(10000);
clientProvider.disconnect();
assertEquals(stompPayload, new String(mqttPayload, "UTF-8"));
clientProvider.disconnect();
}
@Test
public void sendMQTTReceiveSTOMP() throws Exception {
String address = "myTestAddress";
String payload = "This is a test message";
server.getActiveMQServer().createQueue(new SimpleString(address), new SimpleString(address), null, false, false);
// Set up STOMP subscription
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
sendFrame(frame);
frame = "SUBSCRIBE\n" + "destination:" + address + "\n" + "ack:auto\n\n" + Stomp.NULL;
sendFrame(frame);
receiveFrame(1000);
// Send MQTT Message
MQTTClientProvider clientProvider = new FuseMQTTClientProvider();
clientProvider.connect("tcp://localhost:61616");
clientProvider.publish(address, payload.getBytes(), 0);
clientProvider.disconnect();
// Receive STOMP Message
frame = receiveFrame(1000);
assertTrue(frame.contains(payload));
}
@Test
public void testSendMessageToNonExistentQueue() throws Exception {
String nonExistentQueue = RandomUtil.randomString();

View File

@ -183,8 +183,10 @@ public abstract class StompTestBase extends ActiveMQTestBase {
params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
TransportConfiguration allTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName());
Configuration config = createBasicConfig().setPersistenceEnabled(false).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
config.addAcceptorConfiguration(allTransport);
ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));