git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1492447 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-06-12 22:10:34 +00:00
parent 44e3c22652
commit 3845c7caa7
3 changed files with 44 additions and 3 deletions

View File

@ -314,7 +314,8 @@ class MQTTProtocolConverter {
consumerInfo.setDispatchAsync(true);
if (!connect.cleanSession() && (connect.clientId() != null)) {
//by default subscribers are persistent
consumerInfo.setSubscriptionName(connect.clientId().toString());
consumerInfo.setSubscriptionName(
connect.clientId().toString() + topic.name().toString());
}
MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);

View File

@ -17,8 +17,6 @@
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.IOException;
@ -59,6 +57,7 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport {
protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
protected int numberOfMessages;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
@ -70,6 +69,7 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport {
this.numberOfMessages = 1000;
}
@Override
@After
public void tearDown() throws Exception {
if (brokerService != null) {

View File

@ -19,6 +19,9 @@ package org.apache.activemq.transport.mqtt;
import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Test;
@ -69,6 +72,43 @@ public class MQTTTest extends AbstractMQTTTest {
connection.disconnect();
}
public void testSubscribeMultipleTopics() throws Exception {
byte[] payload = new byte[1024 * 32];
for (int i = 0; i < payload.length; i++){
payload[i] = '2';
}
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("MQTT-Client");
mqtt.setCleanSession(false);
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE), new Topic("TopicB", QoS.EXACTLY_ONCE)};
connection.subscribe(topics);
for (Topic topic : topics) {
connection.publish(topic.name().toString(), payload, QoS.AT_LEAST_ONCE, false);
}
int received = 0;
for (int i = 0; i < topics.length; ++i) {
Message message = connection.receive();
assertNotNull(message);
received++;
payload = message.getPayload();
String messageContent = new String(payload);
LOG.info("Received message from topic: " + message.getTopic() +
" Message content: " + messageContent);
message.ack();
}
assertEquals("Should have received " + topics.length + " messages", topics.length, received);
}
@Test(timeout=30000)
public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
// default keep alive in milliseconds