https://issues.apache.org/jira/browse/AMQ-6253 - mqtt composite destinations support for virtual topic subscriptions

This commit is contained in:
Dejan Bosanac 2016-04-15 10:58:19 +02:00
parent 19fd084a83
commit 6d20cba0e4
2 changed files with 47 additions and 3 deletions

View File

@ -182,10 +182,25 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
@Override
public ActiveMQDestination onSend(String topicName) {
if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) {
return new ActiveMQTopic(VIRTUALTOPIC_PREFIX + topicName);
ActiveMQTopic topic = new ActiveMQTopic(topicName);
if (topic.isComposite()) {
ActiveMQDestination[] composites = topic.getCompositeDestinations();
for (ActiveMQDestination composite : composites) {
composite.setPhysicalName(prefix(composite.getPhysicalName()));
}
ActiveMQTopic result = new ActiveMQTopic();
result.setCompositeDestinations(composites);
return result;
} else {
return new ActiveMQTopic(topicName);
return new ActiveMQTopic(prefix(topicName));
}
}
private String prefix(String topicName) {
if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) {
return VIRTUALTOPIC_PREFIX + topicName;
} else {
return topicName;
}
}

View File

@ -379,6 +379,35 @@ public class MQTTTest extends MQTTTestSupport {
assertEquals("Topic changed", publishedTopic, msg.getTopic());
}
@Test(timeout = 2 * 60 * 1000)
public void testMQTTCompositeDestinations() throws Exception {
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("");
mqtt.setCleanSession(true);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = {new Topic(utf8("a/1"), QoS.values()[AT_MOST_ONCE]), new Topic(utf8("a/2"), QoS.values()[AT_MOST_ONCE])};
connection.subscribe(topics);
String payload = "Test Message";
String publishedTopic = "a/1,a/2";
connection.publish(publishedTopic, payload.getBytes(), QoS.values()[AT_MOST_ONCE], false);
Message msg = connection.receive(1, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals("a/2", msg.getTopic());
msg = connection.receive(1, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals("a/1", msg.getTopic());
msg = connection.receive(1, TimeUnit.SECONDS);
assertNull(msg);
}
@Test(timeout = 2 * 60 * 1000)
public void testMQTTPathPatterns() throws Exception {
MQTT mqtt = createMQTTConnection();