This commit is contained in:
Dejan Bosanac 2014-10-01 12:18:32 +02:00
parent 2d9475c4f0
commit fc3d90e8b7
3 changed files with 45 additions and 2 deletions

View File

@ -502,7 +502,6 @@ public class MQTTProtocolConverter {
destination = activeMQDestinationMap.get(command.topicName()); destination = activeMQDestinationMap.get(command.topicName());
if (destination == null) { if (destination == null) {
String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString()); String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
try { try {
destination = findSubscriptionStrategy().onSend(topicName); destination = findSubscriptionStrategy().onSend(topicName);
} catch (IOException e) { } catch (IOException e) {

View File

@ -79,7 +79,31 @@ public class MQTTProtocolSupport {
* @return a destination name formatted for MQTT. * @return a destination name formatted for MQTT.
*/ */
public static String convertActiveMQToMQTT(String destinationName) { public static String convertActiveMQToMQTT(String destinationName) {
return destinationName.replace('.', '/'); char[] chars = destinationName.toCharArray();
for (int i = 0; i < chars.length; i++) {
switch(chars[i]) {
case '>':
chars[i] = '#';
break;
case '#':
chars[i] = '>';
break;
case '*':
chars[i] = '+';
break;
case '+':
chars[i] = '*';
break;
case '.':
chars[i] = '/';
break;
case '/':
chars[i] = '.';
break;
}
}
String rc = new String(chars);
return rc;
} }
/** /**

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.transport.mqtt; package org.apache.activemq.transport.mqtt;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
@ -329,6 +330,25 @@ public class MQTTTest extends MQTTTestSupport {
connection.disconnect(); connection.disconnect();
} }
@Test(timeout = 2 * 60 * 1000)
public void testMQTTWildcard() throws Exception {
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("");
mqtt.setCleanSession(true);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = {new Topic(utf8("a/#"), QoS.values()[AT_MOST_ONCE])};
connection.subscribe(topics);
String payload = "Test Message";
String publishedTopic = "a/b/1.2.3*4>";
connection.publish(publishedTopic, payload.getBytes(), QoS.values()[AT_MOST_ONCE], false);
Message msg = connection.receive(1, TimeUnit.SECONDS);
assertEquals("Topic changed", publishedTopic, msg.getTopic());
}
@Test(timeout = 2 * 60 * 1000) @Test(timeout = 2 * 60 * 1000)
public void testMQTTPathPatterns() throws Exception { public void testMQTTPathPatterns() throws Exception {
MQTT mqtt = createMQTTConnection(); MQTT mqtt = createMQTTConnection();