diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 8f735b8628..42f1728c96 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -314,7 +314,8 @@ class MQTTProtocolConverter { consumerInfo.setDispatchAsync(true); if (!connect.cleanSession() && (connect.clientId() != null)) { //by default subscribers are persistent - consumerInfo.setSubscriptionName(connect.clientId().toString()); + consumerInfo.setSubscriptionName( + connect.clientId().toString() + topic.name().toString()); } MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo); diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java index 587ae852c8..8b0ad30347 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java @@ -17,8 +17,6 @@ package org.apache.activemq.transport.mqtt; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import java.io.File; import java.io.IOException; @@ -59,6 +57,7 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport { protected LinkedList exceptions = new LinkedList(); protected int numberOfMessages; + @Override @Before public void setUp() throws Exception { super.setUp(); @@ -70,6 +69,7 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport { this.numberOfMessages = 1000; } + @Override @After public void tearDown() throws Exception { if (brokerService != null) { diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 3f3f410dd0..9761d91708 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -19,6 +19,9 @@ package org.apache.activemq.transport.mqtt; import org.apache.activemq.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.client.Tracer; import org.fusesource.mqtt.codec.MQTTFrame; import org.junit.Test; @@ -69,6 +72,43 @@ public class MQTTTest extends AbstractMQTTTest { connection.disconnect(); } + public void testSubscribeMultipleTopics() throws Exception { + byte[] payload = new byte[1024 * 32]; + for (int i = 0; i < payload.length; i++){ + payload[i] = '2'; + } + + addMQTTConnector(); + brokerService.start(); + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId("MQTT-Client"); + mqtt.setCleanSession(false); + + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE), new Topic("TopicB", QoS.EXACTLY_ONCE)}; + connection.subscribe(topics); + + for (Topic topic : topics) { + connection.publish(topic.name().toString(), payload, QoS.AT_LEAST_ONCE, false); + } + + int received = 0; + for (int i = 0; i < topics.length; ++i) { + Message message = connection.receive(); + assertNotNull(message); + received++; + payload = message.getPayload(); + String messageContent = new String(payload); + LOG.info("Received message from topic: " + message.getTopic() + + " Message content: " + messageContent); + message.ack(); + } + + assertEquals("Should have received " + topics.length + " messages", topics.length, received); + } + @Test(timeout=30000) public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception { // default keep alive in milliseconds