Refactored to make it easier to test with multiple MQTT client providers

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1430496 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2013-01-08 20:33:16 +00:00
parent 55fef7200a
commit 09c450015e
7 changed files with 474 additions and 356 deletions

View File

@ -305,7 +305,6 @@ class MQTTProtocolConverter {
//by default subscribers are persistent
consumerInfo.setSubscriptionName(connect.clientId().toString());
}
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
subscriptionsByConsumerId.put(id, mqttSubscription);
@ -364,13 +363,13 @@ class MQTTProtocolConverter {
if (sub != null) {
MessageAck ack = sub.createMessageAck(md);
PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
if (ack != null && sub.expectAck()) {
if (ack != null && sub.expectAck(publish)) {
synchronized (consumerAcks) {
consumerAcks.put(publish.messageId(), ack);
}
}
getMQTTTransport().sendToMQTT(publish.encode());
if (ack != null && !sub.expectAck()) {
if (ack != null && !sub.expectAck(publish)) {
getMQTTTransport().sendToActiveMQ(ack);
}
}
@ -683,11 +682,10 @@ class MQTTProtocolConverter {
/**
* Set the default keep alive time (in milliseconds) that would be used if configured on server side
* and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
*
* @param defaultKeepAlive the keepAlive in milliseconds
* @param keepAlive the keepAlive in milliseconds
*/
public void setDefaultKeepAlive(long defaultKeepAlive) {
this.defaultKeepAlive = defaultKeepAlive;
public void setDefaultKeepAlive(long keepAlive) {
this.defaultKeepAlive = keepAlive;
}
public int getActiveMQSubscriptionPrefetch() {

View File

@ -56,16 +56,12 @@ class MQTTSubscription {
return publish;
}
public boolean expectAck() {
return qos != QoS.AT_MOST_ONCE;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
}
public ActiveMQDestination getDestination() {
return destination;
public boolean expectAck(PUBLISH publish) {
QoS publishQoS = publish.qos();
if (publishQoS.compareTo(this.qos) > 0){
publishQoS = this.qos;
}
return !publishQoS.equals(QoS.AT_MOST_ONCE);
}
public ConsumerInfo getConsumerInfo() {

View File

@ -0,0 +1,345 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.io.File;
import java.io.IOException;
import java.security.ProtectionDomain;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
public abstract class AbstractMQTTTest {
protected TransportConnector mqttConnector;
public static final int AT_MOST_ONCE =0;
public static final int AT_LEAST_ONCE = 1;
public static final int EXACTLY_ONCE =2;
public File basedir() throws IOException {
ProtectionDomain protectionDomain = getClass().getProtectionDomain();
return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
}
protected BrokerService brokerService;
protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
protected int numberOfMessages;
AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
@Before
public void startBroker() throws Exception {
autoFailTestSupport.startAutoFailThread();
exceptions.clear();
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
this.numberOfMessages = 3000;
}
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
}
autoFailTestSupport.stopAutoFailThread();
}
@Test
public void testSendAndReceiveMQTT() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE);
final CountDownLatch latch = new CountDownLatch(numberOfMessages);
Thread thread = new Thread(new Runnable() {
public void run() {
for (int i = 0; i < numberOfMessages; i++){
try {
byte[] payload = subscriptionProvider.receive(10000);
assertNotNull("Should get a message", payload);
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
});
thread.start();
final MQTTClientProvider publishProvider = getMQTTClientProvider();
initializeConnection(publishProvider);
for (int i = 0; i < numberOfMessages; i++){
String payload = "Message " + i;
publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE);
}
latch.await(10, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
subscriptionProvider.disconnect();
publishProvider.disconnect();
}
@Test
public void testSendAtMostOnceReceiveExactlyOnce() throws Exception {
/**
* Although subscribing with EXACTLY ONCE, the message gets published
* with AT_MOST_ONCE - in MQTT the QoS is always determined by the message
* as published - not the wish of the subscriber
*/
addMQTTConnector();
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
provider.subscribe("foo",EXACTLY_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
byte[] message = provider.receive(5000);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message));
}
provider.disconnect();
}
@Test
public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
provider.subscribe("foo",EXACTLY_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
byte[] message = provider.receive(5000);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message));
}
provider.disconnect();
}
@Test
public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
provider.subscribe("foo",AT_MOST_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
byte[] message = provider.receive(5000);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message));
}
provider.disconnect();
}
@Test
public void testSendAndReceiveAtMostOnce() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
provider.subscribe("foo",AT_MOST_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
byte[] message = provider.receive(5000);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message));
}
provider.disconnect();
}
@Test
public void testSendAndReceiveAtLeastOnce() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
provider.subscribe("foo",AT_LEAST_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
byte[] message = provider.receive(5000);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message));
}
provider.disconnect();
}
@Test
public void testSendAndReceiveExactlyOnce() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);
final MQTTClientProvider subscriber = getMQTTClientProvider();
initializeConnection(subscriber);
subscriber.subscribe("foo",EXACTLY_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
byte[] message = subscriber.receive(5000);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message));
}
subscriber.disconnect();
publisher.disconnect();
}
@Test
public void testSendAndReceiveLargeMessages() throws Exception {
byte[] payload = new byte[1024 * 32];
for (int i = 0; i < payload.length; i++){
payload[i] = '2';
}
addMQTTConnector();
brokerService.start();
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);
final MQTTClientProvider subscriber = getMQTTClientProvider();
initializeConnection(subscriber);
subscriber.subscribe("foo",AT_LEAST_ONCE);
for (int i = 0; i < 10; i++) {
publisher.publish("foo", payload, AT_LEAST_ONCE);
byte[] message = subscriber.receive(5000);
assertNotNull("Should get a message", message);
assertArrayEquals(payload, message);
}
subscriber.disconnect();
publisher.disconnect();
}
@Test
public void testSendMQTTReceiveJMS() throws Exception {
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
final String DESTINATION_NAME = "foo.*";
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
MessageConsumer consumer = s.createConsumer(jmsTopic);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
assertNotNull("Should get a message", message);
ByteSequence bs = message.getContent();
assertEquals(payload, new String(bs.data, bs.offset, bs.length));
}
activeMQConnection.close();
provider.disconnect();
}
@Test
public void testSendJMSReceiveMQTT() throws Exception {
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic jmsTopic = s.createTopic("foo.far");
MessageProducer producer = s.createProducer(jmsTopic);
provider.subscribe("foo/+",AT_MOST_ONCE);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "This is Test Message: " + i;
TextMessage sendMessage = s.createTextMessage(payload);
producer.send(sendMessage);
byte[] message = provider.receive(5000);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message));
}
provider.disconnect();
activeMQConnection.close();
}
protected String getProtocolScheme() {
return "mqtt";
}
protected void addMQTTConnector() throws Exception {
addMQTTConnector("");
}
protected void addMQTTConnector(String config) throws Exception {
mqttConnector= brokerService.addConnector(getProtocolScheme()+"://localhost:0" + config);
}
protected void initializeConnection(MQTTClientProvider provider) throws Exception {
provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort());
}
protected abstract MQTTClientProvider getMQTTClientProvider();
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
class FuseMQQTTClientProvider implements MQTTClientProvider {
private final MQTT mqtt = new MQTT();
private BlockingConnection connection;
@Override
public void connect(String host) throws Exception {
mqtt.setHost(host);
// shut off connect retry
mqtt.setConnectAttemptsMax(0);
mqtt.setReconnectAttemptsMax(0);
connection = mqtt.blockingConnection();
connection.connect();
}
@Override
public void disconnect() throws Exception {
if (this.connection != null){
this.connection.disconnect();
}
}
@Override
public void publish(String topic, byte[] payload, int qos) throws Exception {
connection.publish(topic,payload, QoS.values()[qos],false);
}
@Override
public void subscribe(String topic, int qos) throws Exception {
Topic[] topics = {new Topic(utf8(topic), QoS.values()[qos])};
connection.subscribe(topics);
}
@Override
public byte[] receive(int timeout) throws Exception {
byte[] result = null;
Message message = connection.receive(timeout, TimeUnit.MILLISECONDS);
if (message != null){
result = message.getPayload();
message.ack();
}
return result;
}
@Override
public void setSslContext(SSLContext sslContext) {
mqtt.setSslContext(sslContext);
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
public interface MQTTClientProvider {
void connect(String host) throws Exception;
void disconnect() throws Exception;
void publish(String topic,byte[] payload,int qos) throws Exception;
void subscribe(String topic,int qos) throws Exception;
byte[] receive(int timeout) throws Exception;
void setSslContext(javax.net.ssl.SSLContext sslContext);
}

View File

@ -24,9 +24,7 @@ import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.activemq.broker.BrokerService;
import org.fusesource.mqtt.client.MQTT;
import org.junit.Ignore;
public class MQTTSSLTest extends MQTTTest {
@ -55,6 +53,15 @@ public class MQTTSSLTest extends MQTTTest {
return mqtt;
}
protected void initializeConnection(MQTTClientProvider provider) throws Exception {
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
provider.setSslContext(ctx);
provider.connect("ssl://localhost:"+mqttConnector.getConnectUri().getPort());
}
static class DefaultTrustManager implements X509TrustManager {
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {

View File

@ -16,348 +16,14 @@
*/
package org.apache.activemq.transport.mqtt;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.ProtectionDomain;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.net.SocketFactory;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNECT;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
public class MQTTTest {
public File basedir() throws IOException {
ProtectionDomain protectionDomain = getClass().getProtectionDomain();
return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
}
protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
protected BrokerService brokerService;
protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
protected int numberOfMessages;
AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
@Before
public void startBroker() throws Exception {
autoFailTestSupport.startAutoFailThread();
exceptions.clear();
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
this.numberOfMessages = 3000;
}
@After
public void stopBroker() throws Exception {
if (brokerService != null) {
brokerService.stop();
}
autoFailTestSupport.stopAutoFailThread();
}
@Test
public void testSendAndReceiveMQTT() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
final BlockingConnection subscribeConnection = mqtt.blockingConnection();
subscribeConnection.connect();
Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE);
Topic[] topics = {topic};
subscribeConnection.subscribe(topics);
final CountDownLatch latch = new CountDownLatch(numberOfMessages);
Thread thread = new Thread(new Runnable() {
public void run() {
for (int i = 0; i < numberOfMessages; i++){
try {
Message message = subscribeConnection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
message.ack();
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
});
thread.start();
BlockingConnection publisherConnection = mqtt.blockingConnection();
publisherConnection.connect();
for (int i = 0; i < numberOfMessages; i++){
String payload = "Message " + i;
publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false);
}
latch.await(10, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
subscribeConnection.disconnect();
publisherConnection.disconnect();
}
@Test
public void testSendAndReceiveAtMostOnce() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive(Short.MAX_VALUE);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)};
connection.subscribe(topics);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
Message message = connection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
assertEquals(payload, new String(message.getPayload()));
}
connection.disconnect();
}
@Test
public void testSendAndReceiveAtLeastOnce() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive(Short.MAX_VALUE);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
connection.subscribe(topics);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Message message = connection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
message.ack();
assertEquals(payload, new String(message.getPayload()));
}
connection.disconnect();
}
@Test
public void testSendAndReceiveExactlyOnce() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT publisher = createMQTTConnection();
BlockingConnection pubConnection = publisher.blockingConnection();
pubConnection.connect();
MQTT subscriber = createMQTTConnection();
BlockingConnection subConnection = subscriber.blockingConnection();
subConnection.connect();
Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)};
subConnection.subscribe(topics);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false);
Message message = subConnection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
LOG.debug(payload);
message.ack();
//System.err.println("Sent " + payload + " GOT " + new String(message.getPayload()));
assertEquals(payload, new String(message.getPayload()));
}
subConnection.disconnect();
pubConnection.disconnect();
}
@Test
public void testSendAndReceiveLargeMessages() throws Exception {
byte[] payload = new byte[1024 * 32];
for (int i = 0; i < payload.length; i++){
payload[i] = '2';
}
addMQTTConnector();
brokerService.start();
MQTT publisher = createMQTTConnection();
BlockingConnection pubConnection = publisher.blockingConnection();
pubConnection.connect();
MQTT subscriber = createMQTTConnection();
BlockingConnection subConnection = subscriber.blockingConnection();
subConnection.connect();
Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
subConnection.subscribe(topics);
for (int i = 0; i < 10; i++) {
pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false);
Message message = subConnection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
message.ack();
assertArrayEquals(payload, message.getPayload());
}
subConnection.disconnect();
pubConnection.disconnect();
}
@Test
public void testSendMQTTReceiveJMS() throws Exception {
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
MQTT mqtt = createMQTTConnection();
BlockingConnection connection = mqtt.blockingConnection();
final String DESTINATION_NAME = "foo.*";
connection.connect();
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
MessageConsumer consumer = s.createConsumer(jmsTopic);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "Test Message: " + i;
connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
assertNotNull("Should get a message", message);
ByteSequence bs = message.getContent();
assertEquals(payload, new String(bs.data, bs.offset, bs.length));
}
activeMQConnection.close();
connection.disconnect();
}
@Test
public void testSendJMSReceiveMQTT() throws Exception {
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setKeepAlive(Short.MAX_VALUE);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic jmsTopic = s.createTopic("foo.far");
MessageProducer producer = s.createProducer(jmsTopic);
Topic[] topics = {new Topic(utf8("foo/+"), QoS.AT_MOST_ONCE)};
connection.subscribe(topics);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "This is Test Message: " + i;
TextMessage sendMessage = s.createTextMessage(payload);
producer.send(sendMessage);
Message message = connection.receive(5, TimeUnit.SECONDS);
assertNotNull("Should get a message", message);
message.ack();
assertEquals(payload, new String(message.getPayload()));
}
connection.disconnect();
}
public void testInactivityTimeoutDisconnectsClient() throws Exception{
addMQTTConnector();
brokerService.start();
// manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
// from timing out
Transport clientTransport = createManualMQTTClient();
clientTransport.start();
CONNECT connectFrame = new CONNECT().clientId(new UTF8Buffer("testClient")).keepAlive((short)2);
clientTransport.oneway(connectFrame.encode());
// wait for broker to register the MQTT connection
assertTrue("MQTT Connection should be registered.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return mqttConnector.getConnections().size() > 0;
}
}));
// wait for broker to time out the MQTT connection due to inactivity
assertTrue("MQTT Connection should be timed out.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return mqttConnector.getConnections().size() == 0;
}
}));
assertTrue("Should have seen client transport exception", exceptions.size() > 0);
clientTransport.stop();
}
private Transport createManualMQTTClient() throws IOException, URISyntaxException {
Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
new URI("tcp://localhost:"+mqttConnector.getConnectUri().getPort()), null);
clientTransport.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
}
@Override
public void onException(IOException error) {
exceptions.add(error);
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
});
return clientTransport;
}
public class MQTTTest extends AbstractMQTTTest {
@Test
public void testPingKeepsInactivityMonitorAlive() throws Exception {
@ -419,7 +85,6 @@ public class MQTTTest {
}));
}
TransportConnector mqttConnector;
protected String getProtocolScheme() {
return "mqtt";
@ -433,6 +98,11 @@ public class MQTTTest {
mqttConnector= brokerService.addConnector(getProtocolScheme()+"://localhost:0" + config);
}
@Override
protected MQTTClientProvider getMQTTClientProvider() {
return new FuseMQQTTClientProvider();
}
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());