added test from mailing list to show MQTT --> JMS mapping works with BytesMessage

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1517556 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Christian Posta 2013-08-26 15:21:15 +00:00
parent 6585d5c874
commit e59ea6d03f
2 changed files with 59 additions and 3 deletions

View File

@ -41,6 +41,7 @@ import static org.junit.Assert.assertArrayEquals;
public abstract class AbstractMQTTTest extends AutoFailTestSupport { public abstract class AbstractMQTTTest extends AutoFailTestSupport {
protected TransportConnector mqttConnector; protected TransportConnector mqttConnector;
protected TransportConnector openwireConnector;
public static final int AT_MOST_ONCE =0; public static final int AT_MOST_ONCE =0;
public static final int AT_LEAST_ONCE = 1; public static final int AT_LEAST_ONCE = 1;
@ -382,6 +383,10 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport {
mqttConnector = brokerService.addConnector(getProtocolScheme()+"://localhost:0?" + config); mqttConnector = brokerService.addConnector(getProtocolScheme()+"://localhost:0?" + config);
} }
protected void addOpenwireConnector() throws Exception {
openwireConnector = brokerService.addConnector("tcp://localhost:0");
}
protected void initializeConnection(MQTTClientProvider provider) throws Exception { protected void initializeConnection(MQTTClientProvider provider) throws Exception {
provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort()); provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort());
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.mqtt;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.MQTT;
@ -30,6 +31,8 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.jms.*;
public class MQTTTest extends AbstractMQTTTest { public class MQTTTest extends AbstractMQTTTest {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class); private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
@ -76,8 +79,56 @@ public class MQTTTest extends AbstractMQTTTest {
connection.disconnect(); connection.disconnect();
} }
@Test(timeout = 300000)
public void testJmsMapping() throws Exception {
addMQTTConnector();
addOpenwireConnector();
brokerService.start();
// start up jms consumer
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:" + openwireConnector.getConnectUri().getPort());
Connection jmsConn = factory.createConnection();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createTopic("test.foo");
MessageConsumer consumer = session.createConsumer(dest);
jmsConn.start();
// set up mqtt producer
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo3");
mqtt.setKeepAlive((short)2);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
int messagesToSend = 5;
// publish
for (int i = 0; i < messagesToSend; ++i) {
connection.publish("test/foo", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false);
}
connection.disconnect();
for (int i = 0; i < messagesToSend; i++) {
javax.jms.Message message = consumer.receive(2 * 1000);
assertNotNull(message);
assertTrue(message instanceof BytesMessage);
BytesMessage bytesMessage = (BytesMessage) message;
int length = (int) bytesMessage.getBodyLength();
byte[] buffer = new byte[length];
bytesMessage.readBytes(buffer);
assertEquals("hello world", new String(buffer));
}
jmsConn.close();
}
@Test(timeout = 300000) @Test(timeout = 300000)
public void testSubscribeMultipleTopics() throws Exception { public void testSubscribeMultipleTopics() throws Exception {
byte[] payload = new byte[1024 * 32]; byte[] payload = new byte[1024 * 32];
for (int i = 0; i < payload.length; i++) { for (int i = 0; i < payload.length; i++) {
payload[i] = '2'; payload[i] = '2';