mirror of https://github.com/apache/activemq.git
Adding an mqtt test that uses the eclipse paho client.
This commit is contained in:
parent
b0b3a169ce
commit
bc9751ac23
|
@ -134,7 +134,11 @@
|
||||||
<type>test-jar</type>
|
<type>test-jar</type>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.eclipse.paho</groupId>
|
||||||
|
<artifactId>mqtt-client</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>junit</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
|
|
|
@ -77,300 +77,6 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport {
|
||||||
super.tearDown();
|
super.tearDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test(timeout=300000)
|
|
||||||
public void testSendAndReceiveMQTT() throws Exception {
|
|
||||||
addMQTTConnector();
|
|
||||||
brokerService.start();
|
|
||||||
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
|
|
||||||
initializeConnection(subscriptionProvider);
|
|
||||||
|
|
||||||
|
|
||||||
subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE);
|
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(numberOfMessages);
|
|
||||||
|
|
||||||
Thread thread = new Thread(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
for (int i = 0; i < numberOfMessages; i++){
|
|
||||||
try {
|
|
||||||
byte[] payload = subscriptionProvider.receive(10000);
|
|
||||||
assertNotNull("Should get a message", payload);
|
|
||||||
latch.countDown();
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
thread.start();
|
|
||||||
|
|
||||||
final MQTTClientProvider publishProvider = getMQTTClientProvider();
|
|
||||||
initializeConnection(publishProvider);
|
|
||||||
|
|
||||||
for (int i = 0; i < numberOfMessages; i++){
|
|
||||||
String payload = "Message " + i;
|
|
||||||
publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE);
|
|
||||||
}
|
|
||||||
|
|
||||||
latch.await(10, TimeUnit.SECONDS);
|
|
||||||
assertEquals(0, latch.getCount());
|
|
||||||
subscriptionProvider.disconnect();
|
|
||||||
publishProvider.disconnect();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=300000)
|
|
||||||
public void testUnsubscribeMQTT() throws Exception {
|
|
||||||
addMQTTConnector();
|
|
||||||
brokerService.start();
|
|
||||||
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
|
|
||||||
initializeConnection(subscriptionProvider);
|
|
||||||
|
|
||||||
String topic = "foo/bah";
|
|
||||||
|
|
||||||
subscriptionProvider.subscribe(topic,AT_MOST_ONCE);
|
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(numberOfMessages/2);
|
|
||||||
|
|
||||||
Thread thread = new Thread(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
for (int i = 0; i < numberOfMessages; i++){
|
|
||||||
try {
|
|
||||||
byte[] payload = subscriptionProvider.receive(10000);
|
|
||||||
assertNotNull("Should get a message", payload);
|
|
||||||
latch.countDown();
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
thread.start();
|
|
||||||
|
|
||||||
final MQTTClientProvider publishProvider = getMQTTClientProvider();
|
|
||||||
initializeConnection(publishProvider);
|
|
||||||
|
|
||||||
for (int i = 0; i < numberOfMessages; i++){
|
|
||||||
String payload = "Message " + i;
|
|
||||||
if (i == numberOfMessages/2){
|
|
||||||
subscriptionProvider.unsubscribe(topic);
|
|
||||||
}
|
|
||||||
publishProvider.publish(topic,payload.getBytes(),AT_LEAST_ONCE);
|
|
||||||
}
|
|
||||||
|
|
||||||
latch.await(10, TimeUnit.SECONDS);
|
|
||||||
assertEquals(0, latch.getCount());
|
|
||||||
subscriptionProvider.disconnect();
|
|
||||||
publishProvider.disconnect();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=300000)
|
|
||||||
public void testSendAtMostOnceReceiveExactlyOnce() throws Exception {
|
|
||||||
/**
|
|
||||||
* Although subscribing with EXACTLY ONCE, the message gets published
|
|
||||||
* with AT_MOST_ONCE - in MQTT the QoS is always determined by the message
|
|
||||||
* as published - not the wish of the subscriber
|
|
||||||
*/
|
|
||||||
addMQTTConnector();
|
|
||||||
brokerService.start();
|
|
||||||
|
|
||||||
final MQTTClientProvider provider = getMQTTClientProvider();
|
|
||||||
initializeConnection(provider);
|
|
||||||
provider.subscribe("foo",EXACTLY_ONCE);
|
|
||||||
for (int i = 0; i < numberOfMessages; i++) {
|
|
||||||
String payload = "Test Message: " + i;
|
|
||||||
provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
|
|
||||||
byte[] message = provider.receive(5000);
|
|
||||||
assertNotNull("Should get a message", message);
|
|
||||||
assertEquals(payload, new String(message));
|
|
||||||
}
|
|
||||||
provider.disconnect();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=300000)
|
|
||||||
public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
|
|
||||||
addMQTTConnector();
|
|
||||||
brokerService.start();
|
|
||||||
|
|
||||||
final MQTTClientProvider provider = getMQTTClientProvider();
|
|
||||||
initializeConnection(provider);
|
|
||||||
provider.subscribe("foo",EXACTLY_ONCE);
|
|
||||||
for (int i = 0; i < numberOfMessages; i++) {
|
|
||||||
String payload = "Test Message: " + i;
|
|
||||||
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
|
|
||||||
byte[] message = provider.receive(5000);
|
|
||||||
assertNotNull("Should get a message", message);
|
|
||||||
assertEquals(payload, new String(message));
|
|
||||||
}
|
|
||||||
provider.disconnect();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=300000)
|
|
||||||
public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception {
|
|
||||||
addMQTTConnector();
|
|
||||||
brokerService.start();
|
|
||||||
|
|
||||||
final MQTTClientProvider provider = getMQTTClientProvider();
|
|
||||||
initializeConnection(provider);
|
|
||||||
provider.subscribe("foo",AT_MOST_ONCE);
|
|
||||||
for (int i = 0; i < numberOfMessages; i++) {
|
|
||||||
String payload = "Test Message: " + i;
|
|
||||||
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
|
|
||||||
byte[] message = provider.receive(5000);
|
|
||||||
assertNotNull("Should get a message", message);
|
|
||||||
assertEquals(payload, new String(message));
|
|
||||||
}
|
|
||||||
provider.disconnect();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=300000)
|
|
||||||
public void testSendAndReceiveAtMostOnce() throws Exception {
|
|
||||||
addMQTTConnector();
|
|
||||||
brokerService.start();
|
|
||||||
|
|
||||||
final MQTTClientProvider provider = getMQTTClientProvider();
|
|
||||||
initializeConnection(provider);
|
|
||||||
provider.subscribe("foo",AT_MOST_ONCE);
|
|
||||||
for (int i = 0; i < numberOfMessages; i++) {
|
|
||||||
String payload = "Test Message: " + i;
|
|
||||||
provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
|
|
||||||
byte[] message = provider.receive(5000);
|
|
||||||
assertNotNull("Should get a message", message);
|
|
||||||
assertEquals(payload, new String(message));
|
|
||||||
}
|
|
||||||
provider.disconnect();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=300000)
|
|
||||||
public void testSendAndReceiveAtLeastOnce() throws Exception {
|
|
||||||
addMQTTConnector();
|
|
||||||
brokerService.start();
|
|
||||||
|
|
||||||
final MQTTClientProvider provider = getMQTTClientProvider();
|
|
||||||
initializeConnection(provider);
|
|
||||||
provider.subscribe("foo",AT_LEAST_ONCE);
|
|
||||||
for (int i = 0; i < numberOfMessages; i++) {
|
|
||||||
String payload = "Test Message: " + i;
|
|
||||||
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
|
|
||||||
byte[] message = provider.receive(5000);
|
|
||||||
assertNotNull("Should get a message", message);
|
|
||||||
assertEquals(payload, new String(message));
|
|
||||||
}
|
|
||||||
provider.disconnect();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=300000)
|
|
||||||
public void testSendAndReceiveExactlyOnce() throws Exception {
|
|
||||||
addMQTTConnector();
|
|
||||||
brokerService.start();
|
|
||||||
final MQTTClientProvider publisher = getMQTTClientProvider();
|
|
||||||
initializeConnection(publisher);
|
|
||||||
|
|
||||||
final MQTTClientProvider subscriber = getMQTTClientProvider();
|
|
||||||
initializeConnection(subscriber);
|
|
||||||
|
|
||||||
subscriber.subscribe("foo",EXACTLY_ONCE);
|
|
||||||
for (int i = 0; i < numberOfMessages; i++) {
|
|
||||||
String payload = "Test Message: " + i;
|
|
||||||
publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
|
|
||||||
byte[] message = subscriber.receive(5000);
|
|
||||||
assertNotNull("Should get a message + ["+ i + "]", message);
|
|
||||||
assertEquals(payload, new String(message));
|
|
||||||
}
|
|
||||||
subscriber.disconnect();
|
|
||||||
publisher.disconnect();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=300000)
|
|
||||||
public void testSendAndReceiveLargeMessages() throws Exception {
|
|
||||||
byte[] payload = new byte[1024 * 32];
|
|
||||||
for (int i = 0; i < payload.length; i++){
|
|
||||||
payload[i] = '2';
|
|
||||||
}
|
|
||||||
addMQTTConnector();
|
|
||||||
brokerService.start();
|
|
||||||
|
|
||||||
final MQTTClientProvider publisher = getMQTTClientProvider();
|
|
||||||
initializeConnection(publisher);
|
|
||||||
|
|
||||||
final MQTTClientProvider subscriber = getMQTTClientProvider();
|
|
||||||
initializeConnection(subscriber);
|
|
||||||
|
|
||||||
subscriber.subscribe("foo",AT_LEAST_ONCE);
|
|
||||||
for (int i = 0; i < 10; i++) {
|
|
||||||
publisher.publish("foo", payload, AT_LEAST_ONCE);
|
|
||||||
byte[] message = subscriber.receive(5000);
|
|
||||||
assertNotNull("Should get a message", message);
|
|
||||||
|
|
||||||
assertArrayEquals(payload, message);
|
|
||||||
}
|
|
||||||
subscriber.disconnect();
|
|
||||||
publisher.disconnect();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=300000)
|
|
||||||
public void testSendMQTTReceiveJMS() throws Exception {
|
|
||||||
addMQTTConnector();
|
|
||||||
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
|
|
||||||
brokerService.start();
|
|
||||||
|
|
||||||
final MQTTClientProvider provider = getMQTTClientProvider();
|
|
||||||
initializeConnection(provider);
|
|
||||||
final String DESTINATION_NAME = "foo.*";
|
|
||||||
|
|
||||||
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
|
|
||||||
activeMQConnection.start();
|
|
||||||
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
||||||
javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
|
|
||||||
MessageConsumer consumer = s.createConsumer(jmsTopic);
|
|
||||||
|
|
||||||
for (int i = 0; i < numberOfMessages; i++) {
|
|
||||||
String payload = "Test Message: " + i;
|
|
||||||
provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
|
|
||||||
ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
|
|
||||||
assertNotNull("Should get a message", message);
|
|
||||||
ByteSequence bs = message.getContent();
|
|
||||||
assertEquals(payload, new String(bs.data, bs.offset, bs.length));
|
|
||||||
}
|
|
||||||
|
|
||||||
activeMQConnection.close();
|
|
||||||
provider.disconnect();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=300000)
|
|
||||||
public void testSendJMSReceiveMQTT() throws Exception {
|
|
||||||
addMQTTConnector();
|
|
||||||
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
|
|
||||||
brokerService.start();
|
|
||||||
final MQTTClientProvider provider = getMQTTClientProvider();
|
|
||||||
initializeConnection(provider);
|
|
||||||
|
|
||||||
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
|
|
||||||
activeMQConnection.start();
|
|
||||||
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
||||||
javax.jms.Topic jmsTopic = s.createTopic("foo.far");
|
|
||||||
MessageProducer producer = s.createProducer(jmsTopic);
|
|
||||||
|
|
||||||
provider.subscribe("foo/+",AT_MOST_ONCE);
|
|
||||||
for (int i = 0; i < numberOfMessages; i++) {
|
|
||||||
String payload = "This is Test Message: " + i;
|
|
||||||
TextMessage sendMessage = s.createTextMessage(payload);
|
|
||||||
producer.send(sendMessage);
|
|
||||||
byte[] message = provider.receive(5000);
|
|
||||||
assertNotNull("Should get a message", message);
|
|
||||||
|
|
||||||
assertEquals(payload, new String(message));
|
|
||||||
}
|
|
||||||
provider.disconnect();
|
|
||||||
activeMQConnection.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String getProtocolScheme() {
|
protected String getProtocolScheme() {
|
||||||
return "mqtt";
|
return "mqtt";
|
||||||
}
|
}
|
||||||
|
@ -391,5 +97,4 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport {
|
||||||
provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort());
|
provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract MQTTClientProvider getMQTTClientProvider();
|
|
||||||
}
|
}
|
|
@ -16,9 +16,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.mqtt;
|
package org.apache.activemq.transport.mqtt;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
import org.apache.activemq.util.ByteSequence;
|
||||||
import org.apache.activemq.util.Wait;
|
import org.apache.activemq.util.Wait;
|
||||||
import org.fusesource.mqtt.client.BlockingConnection;
|
import org.fusesource.mqtt.client.BlockingConnection;
|
||||||
import org.fusesource.mqtt.client.MQTT;
|
import org.fusesource.mqtt.client.MQTT;
|
||||||
|
@ -33,10 +38,305 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.jms.*;
|
import javax.jms.*;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
|
||||||
public class MQTTTest extends AbstractMQTTTest {
|
public class MQTTTest extends AbstractMQTTTest {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
|
private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testSendAndReceiveMQTT() throws Exception {
|
||||||
|
addMQTTConnector();
|
||||||
|
brokerService.start();
|
||||||
|
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
|
||||||
|
initializeConnection(subscriptionProvider);
|
||||||
|
|
||||||
|
|
||||||
|
subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE);
|
||||||
|
|
||||||
|
final CountDownLatch latch = new CountDownLatch(numberOfMessages);
|
||||||
|
|
||||||
|
Thread thread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (int i = 0; i < numberOfMessages; i++){
|
||||||
|
try {
|
||||||
|
byte[] payload = subscriptionProvider.receive(10000);
|
||||||
|
assertNotNull("Should get a message", payload);
|
||||||
|
latch.countDown();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
thread.start();
|
||||||
|
|
||||||
|
final MQTTClientProvider publishProvider = getMQTTClientProvider();
|
||||||
|
initializeConnection(publishProvider);
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfMessages; i++){
|
||||||
|
String payload = "Message " + i;
|
||||||
|
publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE);
|
||||||
|
}
|
||||||
|
|
||||||
|
latch.await(10, TimeUnit.SECONDS);
|
||||||
|
assertEquals(0, latch.getCount());
|
||||||
|
subscriptionProvider.disconnect();
|
||||||
|
publishProvider.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testUnsubscribeMQTT() throws Exception {
|
||||||
|
addMQTTConnector();
|
||||||
|
brokerService.start();
|
||||||
|
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
|
||||||
|
initializeConnection(subscriptionProvider);
|
||||||
|
|
||||||
|
String topic = "foo/bah";
|
||||||
|
|
||||||
|
subscriptionProvider.subscribe(topic,AT_MOST_ONCE);
|
||||||
|
|
||||||
|
final CountDownLatch latch = new CountDownLatch(numberOfMessages/2);
|
||||||
|
|
||||||
|
Thread thread = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (int i = 0; i < numberOfMessages; i++){
|
||||||
|
try {
|
||||||
|
byte[] payload = subscriptionProvider.receive(10000);
|
||||||
|
assertNotNull("Should get a message", payload);
|
||||||
|
latch.countDown();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
thread.start();
|
||||||
|
|
||||||
|
final MQTTClientProvider publishProvider = getMQTTClientProvider();
|
||||||
|
initializeConnection(publishProvider);
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfMessages; i++){
|
||||||
|
String payload = "Message " + i;
|
||||||
|
if (i == numberOfMessages/2){
|
||||||
|
subscriptionProvider.unsubscribe(topic);
|
||||||
|
}
|
||||||
|
publishProvider.publish(topic,payload.getBytes(),AT_LEAST_ONCE);
|
||||||
|
}
|
||||||
|
|
||||||
|
latch.await(10, TimeUnit.SECONDS);
|
||||||
|
assertEquals(0, latch.getCount());
|
||||||
|
subscriptionProvider.disconnect();
|
||||||
|
publishProvider.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testSendAtMostOnceReceiveExactlyOnce() throws Exception {
|
||||||
|
/**
|
||||||
|
* Although subscribing with EXACTLY ONCE, the message gets published
|
||||||
|
* with AT_MOST_ONCE - in MQTT the QoS is always determined by the message
|
||||||
|
* as published - not the wish of the subscriber
|
||||||
|
*/
|
||||||
|
addMQTTConnector();
|
||||||
|
brokerService.start();
|
||||||
|
|
||||||
|
final MQTTClientProvider provider = getMQTTClientProvider();
|
||||||
|
initializeConnection(provider);
|
||||||
|
provider.subscribe("foo",EXACTLY_ONCE);
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
String payload = "Test Message: " + i;
|
||||||
|
provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
|
||||||
|
byte[] message = provider.receive(5000);
|
||||||
|
assertNotNull("Should get a message", message);
|
||||||
|
assertEquals(payload, new String(message));
|
||||||
|
}
|
||||||
|
provider.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
|
||||||
|
addMQTTConnector();
|
||||||
|
brokerService.start();
|
||||||
|
|
||||||
|
final MQTTClientProvider provider = getMQTTClientProvider();
|
||||||
|
initializeConnection(provider);
|
||||||
|
provider.subscribe("foo",EXACTLY_ONCE);
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
String payload = "Test Message: " + i;
|
||||||
|
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
|
||||||
|
byte[] message = provider.receive(5000);
|
||||||
|
assertNotNull("Should get a message", message);
|
||||||
|
assertEquals(payload, new String(message));
|
||||||
|
}
|
||||||
|
provider.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception {
|
||||||
|
addMQTTConnector();
|
||||||
|
brokerService.start();
|
||||||
|
|
||||||
|
final MQTTClientProvider provider = getMQTTClientProvider();
|
||||||
|
initializeConnection(provider);
|
||||||
|
provider.subscribe("foo",AT_MOST_ONCE);
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
String payload = "Test Message: " + i;
|
||||||
|
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
|
||||||
|
byte[] message = provider.receive(5000);
|
||||||
|
assertNotNull("Should get a message", message);
|
||||||
|
assertEquals(payload, new String(message));
|
||||||
|
}
|
||||||
|
provider.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testSendAndReceiveAtMostOnce() throws Exception {
|
||||||
|
addMQTTConnector();
|
||||||
|
brokerService.start();
|
||||||
|
|
||||||
|
final MQTTClientProvider provider = getMQTTClientProvider();
|
||||||
|
initializeConnection(provider);
|
||||||
|
provider.subscribe("foo",AT_MOST_ONCE);
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
String payload = "Test Message: " + i;
|
||||||
|
provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
|
||||||
|
byte[] message = provider.receive(5000);
|
||||||
|
assertNotNull("Should get a message", message);
|
||||||
|
assertEquals(payload, new String(message));
|
||||||
|
}
|
||||||
|
provider.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testSendAndReceiveAtLeastOnce() throws Exception {
|
||||||
|
addMQTTConnector();
|
||||||
|
brokerService.start();
|
||||||
|
|
||||||
|
final MQTTClientProvider provider = getMQTTClientProvider();
|
||||||
|
initializeConnection(provider);
|
||||||
|
provider.subscribe("foo",AT_LEAST_ONCE);
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
String payload = "Test Message: " + i;
|
||||||
|
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
|
||||||
|
byte[] message = provider.receive(5000);
|
||||||
|
assertNotNull("Should get a message", message);
|
||||||
|
assertEquals(payload, new String(message));
|
||||||
|
}
|
||||||
|
provider.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testSendAndReceiveExactlyOnce() throws Exception {
|
||||||
|
addMQTTConnector();
|
||||||
|
brokerService.start();
|
||||||
|
final MQTTClientProvider publisher = getMQTTClientProvider();
|
||||||
|
initializeConnection(publisher);
|
||||||
|
|
||||||
|
final MQTTClientProvider subscriber = getMQTTClientProvider();
|
||||||
|
initializeConnection(subscriber);
|
||||||
|
|
||||||
|
subscriber.subscribe("foo",EXACTLY_ONCE);
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
String payload = "Test Message: " + i;
|
||||||
|
publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
|
||||||
|
byte[] message = subscriber.receive(5000);
|
||||||
|
assertNotNull("Should get a message + ["+ i + "]", message);
|
||||||
|
assertEquals(payload, new String(message));
|
||||||
|
}
|
||||||
|
subscriber.disconnect();
|
||||||
|
publisher.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testSendAndReceiveLargeMessages() throws Exception {
|
||||||
|
byte[] payload = new byte[1024 * 32];
|
||||||
|
for (int i = 0; i < payload.length; i++){
|
||||||
|
payload[i] = '2';
|
||||||
|
}
|
||||||
|
addMQTTConnector();
|
||||||
|
brokerService.start();
|
||||||
|
|
||||||
|
final MQTTClientProvider publisher = getMQTTClientProvider();
|
||||||
|
initializeConnection(publisher);
|
||||||
|
|
||||||
|
final MQTTClientProvider subscriber = getMQTTClientProvider();
|
||||||
|
initializeConnection(subscriber);
|
||||||
|
|
||||||
|
subscriber.subscribe("foo",AT_LEAST_ONCE);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
publisher.publish("foo", payload, AT_LEAST_ONCE);
|
||||||
|
byte[] message = subscriber.receive(5000);
|
||||||
|
assertNotNull("Should get a message", message);
|
||||||
|
|
||||||
|
assertArrayEquals(payload, message);
|
||||||
|
}
|
||||||
|
subscriber.disconnect();
|
||||||
|
publisher.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testSendMQTTReceiveJMS() throws Exception {
|
||||||
|
addMQTTConnector();
|
||||||
|
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
|
||||||
|
brokerService.start();
|
||||||
|
|
||||||
|
final MQTTClientProvider provider = getMQTTClientProvider();
|
||||||
|
initializeConnection(provider);
|
||||||
|
final String DESTINATION_NAME = "foo.*";
|
||||||
|
|
||||||
|
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
|
||||||
|
activeMQConnection.start();
|
||||||
|
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
|
||||||
|
MessageConsumer consumer = s.createConsumer(jmsTopic);
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
String payload = "Test Message: " + i;
|
||||||
|
provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
|
||||||
|
ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
|
||||||
|
assertNotNull("Should get a message", message);
|
||||||
|
ByteSequence bs = message.getContent();
|
||||||
|
assertEquals(payload, new String(bs.data, bs.offset, bs.length));
|
||||||
|
}
|
||||||
|
|
||||||
|
activeMQConnection.close();
|
||||||
|
provider.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testSendJMSReceiveMQTT() throws Exception {
|
||||||
|
addMQTTConnector();
|
||||||
|
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
|
||||||
|
brokerService.start();
|
||||||
|
final MQTTClientProvider provider = getMQTTClientProvider();
|
||||||
|
initializeConnection(provider);
|
||||||
|
|
||||||
|
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
|
||||||
|
activeMQConnection.start();
|
||||||
|
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
javax.jms.Topic jmsTopic = s.createTopic("foo.far");
|
||||||
|
MessageProducer producer = s.createProducer(jmsTopic);
|
||||||
|
|
||||||
|
provider.subscribe("foo/+",AT_MOST_ONCE);
|
||||||
|
for (int i = 0; i < numberOfMessages; i++) {
|
||||||
|
String payload = "This is Test Message: " + i;
|
||||||
|
TextMessage sendMessage = s.createTextMessage(payload);
|
||||||
|
producer.send(sendMessage);
|
||||||
|
byte[] message = provider.receive(5000);
|
||||||
|
assertNotNull("Should get a message", message);
|
||||||
|
|
||||||
|
assertEquals(payload, new String(message));
|
||||||
|
}
|
||||||
|
provider.disconnect();
|
||||||
|
activeMQConnection.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=300000)
|
@Test(timeout=300000)
|
||||||
public void testPingKeepsInactivityMonitorAlive() throws Exception {
|
public void testPingKeepsInactivityMonitorAlive() throws Exception {
|
||||||
addMQTTConnector();
|
addMQTTConnector();
|
||||||
|
@ -287,8 +587,6 @@ public class MQTTTest extends AbstractMQTTTest {
|
||||||
return "mqtt";
|
return "mqtt";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected MQTTClientProvider getMQTTClientProvider() {
|
protected MQTTClientProvider getMQTTClientProvider() {
|
||||||
return new FuseMQQTTClientProvider();
|
return new FuseMQQTTClientProvider();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.mqtt;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
public class PahoMQTTTest extends AbstractMQTTTest {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testSendAndReceiveMQTT() throws Exception {
|
||||||
|
addMQTTConnector();
|
||||||
|
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
|
||||||
|
brokerService.start();
|
||||||
|
|
||||||
|
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
|
||||||
|
activeMQConnection.start();
|
||||||
|
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer consumer = s.createConsumer(s.createTopic("test"));
|
||||||
|
|
||||||
|
MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), "clientid");
|
||||||
|
client.connect();
|
||||||
|
client.publish("test", "hello".getBytes(), 1, false);
|
||||||
|
|
||||||
|
Message msg = consumer.receive(100 * 5);
|
||||||
|
assertNotNull(msg);
|
||||||
|
|
||||||
|
client.disconnect();
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
13
pom.xml
13
pom.xml
|
@ -97,6 +97,7 @@
|
||||||
<org-apache-derby-version>10.10.1.1</org-apache-derby-version>
|
<org-apache-derby-version>10.10.1.1</org-apache-derby-version>
|
||||||
<org.osgi.core-version>4.3.1</org.osgi.core-version>
|
<org.osgi.core-version>4.3.1</org.osgi.core-version>
|
||||||
<p2psockets-version>1.1.2</p2psockets-version>
|
<p2psockets-version>1.1.2</p2psockets-version>
|
||||||
|
<paho-version>0.4.0</paho-version>
|
||||||
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
|
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
|
||||||
<zookeeper-version>3.4.5</zookeeper-version>
|
<zookeeper-version>3.4.5</zookeeper-version>
|
||||||
<qpid-proton-version>0.5</qpid-proton-version>
|
<qpid-proton-version>0.5</qpid-proton-version>
|
||||||
|
@ -365,6 +366,11 @@
|
||||||
<type>test-jar</type>
|
<type>test-jar</type>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.eclipse.paho</groupId>
|
||||||
|
<artifactId>mqtt-client</artifactId>
|
||||||
|
<version>${paho-version}</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.activemq</groupId>
|
<groupId>org.apache.activemq</groupId>
|
||||||
<artifactId>activemq-jaas</artifactId>
|
<artifactId>activemq-jaas</artifactId>
|
||||||
|
@ -1591,6 +1597,13 @@
|
||||||
<releases><enabled>true</enabled></releases>
|
<releases><enabled>true</enabled></releases>
|
||||||
<snapshots><enabled>false</enabled></snapshots>
|
<snapshots><enabled>false</enabled></snapshots>
|
||||||
</repository>
|
</repository>
|
||||||
|
<!-- for the paho dependency -->
|
||||||
|
<repository>
|
||||||
|
<id>eclipse.m2</id>
|
||||||
|
<url>https://repo.eclipse.org/content/groups/releases/</url>
|
||||||
|
<releases><enabled>true</enabled></releases>
|
||||||
|
<snapshots><enabled>false</enabled></snapshots>
|
||||||
|
</repository>
|
||||||
<repository>
|
<repository>
|
||||||
<id>com.fusesource.m2.snapshot</id>
|
<id>com.fusesource.m2.snapshot</id>
|
||||||
<url>http://repo.fusesource.com/nexus/content/repositories/snapshots/</url>
|
<url>http://repo.fusesource.com/nexus/content/repositories/snapshots/</url>
|
||||||
|
|
Loading…
Reference in New Issue