https://issues.apache.org/jira/browse/AMQ-6002 - escape client id in virtual topic mqtt subscription

This commit is contained in:
Dejan Bosanac 2015-10-07 11:28:19 +02:00
parent bbcd938032
commit aa743cbd7a
2 changed files with 68 additions and 52 deletions

View File

@ -84,22 +84,21 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
ActiveMQDestination destination = null; ActiveMQDestination destination = null;
int prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; int prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
String converted = convertMQTTToActiveMQ(topicName);
if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) { if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
String converted = convertMQTTToActiveMQ(topicName);
if (converted.startsWith(VIRTUALTOPIC_PREFIX)) { if (converted.startsWith(VIRTUALTOPIC_PREFIX)) {
destination = new ActiveMQTopic(converted); destination = new ActiveMQTopic(converted);
prefetch = ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; prefetch = ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName); consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
} else { } else {
converted = VIRTUALTOPIC_CONSUMER_PREFIX + converted = VIRTUALTOPIC_CONSUMER_PREFIX +
protocol.getClientId() + ":" + requestedQoS + "." + convertMQTTToActiveMQ(protocol.getClientId()) + ":" + requestedQoS + "." +
VIRTUALTOPIC_PREFIX + converted; VIRTUALTOPIC_PREFIX + converted;
destination = new ActiveMQQueue(converted); destination = new ActiveMQQueue(converted);
prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
} }
} else { } else {
String converted = convertMQTTToActiveMQ(topicName);
if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) { if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) {
converted = VIRTUALTOPIC_PREFIX + converted; converted = VIRTUALTOPIC_PREFIX + converted;
} }

View File

@ -43,6 +43,10 @@ public class PahoMQTTTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class); private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
protected MessageConsumer createConsumer(Session s, String topic) throws Exception {
return s.createConsumer(s.createTopic(topic));
}
@Test(timeout = 300000) @Test(timeout = 300000)
public void testLotsOfClients() throws Exception { public void testLotsOfClients() throws Exception {
@ -52,7 +56,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
activeMQConnection.start(); activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = s.createConsumer(s.createTopic("test")); MessageConsumer consumer = createConsumer(s, "test");
final AtomicInteger receiveCounter = new AtomicInteger(); final AtomicInteger receiveCounter = new AtomicInteger();
consumer.setMessageListener(new MessageListener() { consumer.setMessageListener(new MessageListener() {
@ -118,7 +122,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
activeMQConnection.start(); activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = s.createConsumer(s.createTopic("test")); MessageConsumer consumer = createConsumer(s, "test");
MqttClient client = new MqttClient("tcp://localhost:" + getPort(), "clientid", new MemoryPersistence()); MqttClient client = new MqttClient("tcp://localhost:" + getPort(), "clientid", new MemoryPersistence());
client.connect(); client.connect();
@ -128,16 +132,11 @@ public class PahoMQTTTest extends MQTTTestSupport {
assertNotNull(msg); assertNotNull(msg);
client.disconnect(); client.disconnect();
client.close();
} }
@Test(timeout = 300000) @Test(timeout = 300000)
public void testSubs() throws Exception { public void testSubs() throws Exception {
stopBroker();
protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
startBroker();
final DefaultListener listener = new DefaultListener(); final DefaultListener listener = new DefaultListener();
// subscriber connects and creates durable sub // subscriber connects and creates durable sub
MqttClient client = createClient(false, "receive", listener); MqttClient client = createClient(false, "receive", listener);
@ -195,10 +194,6 @@ public class PahoMQTTTest extends MQTTTestSupport {
@Test(timeout = 300000) @Test(timeout = 300000)
public void testOverlappingTopics() throws Exception { public void testOverlappingTopics() throws Exception {
stopBroker();
protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
startBroker();
final DefaultListener listener = new DefaultListener(); final DefaultListener listener = new DefaultListener();
// subscriber connects and creates durable sub // subscriber connects and creates durable sub
MqttClient client = createClient(false, "receive", listener); MqttClient client = createClient(false, "receive", listener);
@ -278,7 +273,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return listener.result != null; return listener.result != null;
} }
}, TimeUnit.SECONDS.toMillis(20))); }, TimeUnit.SECONDS.toMillis(5)));
assertNull(listener.result); assertNull(listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
@ -290,7 +285,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return listener.result != null; return listener.result != null;
} }
}, TimeUnit.SECONDS.toMillis(20))); }, TimeUnit.SECONDS.toMillis(5)));
assertNull(listener.result); assertNull(listener.result);
assertTrue(client.getPendingDeliveryTokens().length == 0); assertTrue(client.getPendingDeliveryTokens().length == 0);
} }
@ -354,53 +349,75 @@ public class PahoMQTTTest extends MQTTTestSupport {
} }
@Test(timeout = 300000) @Test(timeout = 300000)
public void testVirtualTopicQueueRestore() throws Exception { public void testClientIdSpecialChars() throws Exception {
testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1);
testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1_1);
}
stopBroker(); protected void testClientId(String clientId, int mqttVersion, final DefaultListener clientAdminMqttCallback) throws Exception {
protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
startBroker();
String user10 = "user10";
String password10 = "user10";
String clientId10 = "client-10";
String topic10 = "user10/";
MqttConnectOptions options10 = new MqttConnectOptions();
options10.setCleanSession(false);
options10.setUserName(user10);
options10.setPassword(password10.toCharArray());
MqttClient client10 = createClient(false, clientId10, null);
client10.subscribe(topic10 + clientId10 + "/#", 1);
client10.subscribe(topic10 + "#", 1);
String user1 = "user1";
String password1 = "user1";
String clientId1 = "client-1";
String topic1 = "user1/";
MqttConnectOptions options1 = new MqttConnectOptions(); MqttConnectOptions options1 = new MqttConnectOptions();
options1.setCleanSession(false); options1.setCleanSession(false);
options1.setUserName(user1); options1.setUserName("client1");
options1.setPassword(password1.toCharArray()); options1.setPassword("client1".toCharArray());
options1.setMqttVersion(mqttVersion);
final DefaultListener client1MqttCallback = new DefaultListener();
MqttClient client1 = createClient(options1, clientId, client1MqttCallback);
client1.setCallback(client1MqttCallback);
MqttClient client1 = createClient(false, clientId1, null); String topic = "client1/" + clientId + "/topic";
client1.subscribe(topic1 + clientId1 + "/#", 1); client1.subscribe(topic, 1);
client1.subscribe(topic1 + "#", 1);
RegionBroker regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); String message = "Message from client: " + clientId;
client1.publish(topic, message.getBytes(), 1, false);
String[] queues = new String[]{"Consumer.client-10:AT_LEAST_ONCE.VirtualTopic.user10.>",
"Consumer.client-10:AT_LEAST_ONCE.VirtualTopic.user10.client-10.>",
"Consumer.client-1:AT_LEAST_ONCE.VirtualTopic.user1.>",
"Consumer.client-1:AT_LEAST_ONCE.VirtualTopic.user1.client-1.>"};
for (String queueName : queues) { assertTrue(Wait.waitFor(new Wait.Condition() {
Destination queue = regionBroker.getQueueRegion().getDestinations(new ActiveMQQueue(queueName)).iterator().next(); @Override
assertEquals("Queue " + queueName + " have more than one consumer", 1, queue.getConsumers().size()); public boolean isSatisified() throws Exception {
} return client1MqttCallback.result != null;
}
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
assertEquals(message, client1MqttCallback.result);
assertEquals(1, client1MqttCallback.received);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return clientAdminMqttCallback.result != null;
}
}, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
assertEquals(message, clientAdminMqttCallback.result);
assertTrue(client1.isConnected());
client1.disconnect();
} }
protected void testClientIdSpecialChars(int mqttVersion) throws Exception {
LOG.info("Test MQTT version {}", mqttVersion);
MqttConnectOptions optionsAdmin = new MqttConnectOptions();
optionsAdmin.setCleanSession(false);
optionsAdmin.setUserName("admin");
optionsAdmin.setPassword("admin".toCharArray());
DefaultListener clientAdminMqttCallback = new DefaultListener();
MqttClient clientAdmin = createClient(optionsAdmin, "admin", clientAdminMqttCallback);
clientAdmin.subscribe("#", 1);
testClientId(":%&&@.:llll", mqttVersion, clientAdminMqttCallback);
testClientId("Consumer:id:AT_LEAST_ONCE", mqttVersion, clientAdminMqttCallback);
testClientId("Consumer:qid:EXACTLY_ONCE:VirtualTopic", mqttVersion, clientAdminMqttCallback);
testClientId("Consumertestmin:testst:AT_LEAST_ONCE.VirtualTopic::AT_LEAST_ONCE", mqttVersion, clientAdminMqttCallback);
}
protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception { protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception {
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(cleanSession); options.setCleanSession(cleanSession);
return createClient(options, clientId, listener);
}
protected MqttClient createClient(MqttConnectOptions options, String clientId, MqttCallback listener) throws Exception {
final MqttClient client = new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence()); final MqttClient client = new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
client.setCallback(listener); client.setCallback(listener);
client.connect(options); client.connect(options);