https://issues.apache.org/jira/browse/AMQ-5292 Added support for publishDollarTopics transport option for MQTT to support Topics with '$' prefix

This commit is contained in:
Dhiraj Bokde 2014-07-28 12:38:43 -07:00
parent 207d4cdee6
commit 73908d6498
3 changed files with 66 additions and 10 deletions

View File

@ -133,6 +133,7 @@ public class MQTTProtocolConverter {
private int activeMQSubscriptionPrefetch=1;
protected static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
private final MQTTPacketIdGenerator packetIdGenerator;
private boolean publishDollarTopics;
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
this.mqttTransport = mqttTransport;
@ -152,7 +153,7 @@ public class MQTTProtocolConverter {
// Lets intercept message send requests..
if( command instanceof ActiveMQMessage) {
ActiveMQMessage msg = (ActiveMQMessage) command;
if( msg.getDestination().getPhysicalName().startsWith("$") ) {
if( !getPublishDollarTopics() && msg.getDestination().getPhysicalName().startsWith("$") ) {
// We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1 spec requirements
if( handler!=null ) {
try {
@ -971,4 +972,12 @@ public class MQTTProtocolConverter {
public MQTTPacketIdGenerator getPacketIdGenerator() {
return packetIdGenerator;
}
public void setPublishDollarTopics(boolean publishDollarTopics) {
this.publishDollarTopics = publishDollarTopics;
}
public boolean getPublishDollarTopics() {
return publishDollarTopics;
}
}

View File

@ -189,6 +189,14 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
protocolConverter.setDefaultKeepAlive(defaultHeartBeat);
}
public boolean getPublishDollarTopics() {
return protocolConverter != null && protocolConverter.getPublishDollarTopics();
}
public void setPublishDollarTopics(boolean publishDollarTopics) {
protocolConverter.setPublishDollarTopics(publishDollarTopics);
}
public int getActiveMQSubscriptionPrefetch() {
return protocolConverter.getActiveMQSubscriptionPrefetch();
}

View File

@ -16,14 +16,6 @@
*/
package org.apache.activemq.transport.mqtt;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Arrays;
@ -35,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
@ -44,6 +35,13 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -1045,6 +1043,47 @@ public class MQTTTest extends MQTTTestSupport {
connection.disconnect();
}
@Test(timeout = 60 * 1000)
public void testPublishDollarTopics() throws Exception {
stopBroker();
startBroker();
MQTT mqtt = createMQTTConnection();
final String clientId = "publishDollar";
mqtt.setClientId(clientId);
mqtt.setKeepAlive((short) 2);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
final String DOLLAR_TOPIC = "$TopicA";
connection.subscribe(new Topic[] { new Topic(DOLLAR_TOPIC, QoS.EXACTLY_ONCE)});
connection.publish(DOLLAR_TOPIC, DOLLAR_TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
Message message = connection.receive(10, TimeUnit.SECONDS);
assertNull("Publish enabled for $ Topics by default", message);
connection.disconnect();
stopBroker();
protocolConfig = "transport.publishDollarTopics=true";
startBroker();
mqtt = createMQTTConnection();
mqtt.setClientId(clientId);
mqtt.setKeepAlive((short) 2);
connection = mqtt.blockingConnection();
connection.connect();
connection.subscribe(new Topic[] { new Topic(DOLLAR_TOPIC, QoS.EXACTLY_ONCE)});
connection.publish(DOLLAR_TOPIC, DOLLAR_TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
message = connection.receive(10, TimeUnit.SECONDS);
assertNotNull(message);
message.ack();
assertEquals("Message body", DOLLAR_TOPIC, new String(message.getPayload()));
connection.disconnect();
}
@Test(timeout = 30 * 10000)
public void testJmsMapping() throws Exception {
// start up jms consumer