diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 769656f107..3e645561e6 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -133,6 +133,7 @@ public class MQTTProtocolConverter { private int activeMQSubscriptionPrefetch=1; protected static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS"; private final MQTTPacketIdGenerator packetIdGenerator; + private boolean publishDollarTopics; public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) { this.mqttTransport = mqttTransport; @@ -152,7 +153,7 @@ public class MQTTProtocolConverter { // Lets intercept message send requests.. if( command instanceof ActiveMQMessage) { ActiveMQMessage msg = (ActiveMQMessage) command; - if( msg.getDestination().getPhysicalName().startsWith("$") ) { + if( !getPublishDollarTopics() && msg.getDestination().getPhysicalName().startsWith("$") ) { // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1 spec requirements if( handler!=null ) { try { @@ -971,4 +972,12 @@ public class MQTTProtocolConverter { public MQTTPacketIdGenerator getPacketIdGenerator() { return packetIdGenerator; } + + public void setPublishDollarTopics(boolean publishDollarTopics) { + this.publishDollarTopics = publishDollarTopics; + } + + public boolean getPublishDollarTopics() { + return publishDollarTopics; + } } diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java index 181d2357db..8612c2517f 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java @@ -189,6 +189,14 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor protocolConverter.setDefaultKeepAlive(defaultHeartBeat); } + public boolean getPublishDollarTopics() { + return protocolConverter != null && protocolConverter.getPublishDollarTopics(); + } + + public void setPublishDollarTopics(boolean publishDollarTopics) { + protocolConverter.setPublishDollarTopics(publishDollarTopics); + } + public int getActiveMQSubscriptionPrefetch() { return protocolConverter.getActiveMQSubscriptionPrefetch(); } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index ec7f1ccde8..fc91f273d5 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,14 +16,6 @@ */ package org.apache.activemq.transport.mqtt; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.net.ProtocolException; import java.util.ArrayList; import java.util.Arrays; @@ -35,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; - import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; @@ -44,6 +35,13 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -1045,6 +1043,47 @@ public class MQTTTest extends MQTTTestSupport { connection.disconnect(); } + @Test(timeout = 60 * 1000) + public void testPublishDollarTopics() throws Exception { + stopBroker(); + startBroker(); + + MQTT mqtt = createMQTTConnection(); + final String clientId = "publishDollar"; + mqtt.setClientId(clientId); + mqtt.setKeepAlive((short) 2); + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + final String DOLLAR_TOPIC = "$TopicA"; + connection.subscribe(new Topic[] { new Topic(DOLLAR_TOPIC, QoS.EXACTLY_ONCE)}); + connection.publish(DOLLAR_TOPIC, DOLLAR_TOPIC.getBytes(), QoS.EXACTLY_ONCE, true); + + Message message = connection.receive(10, TimeUnit.SECONDS); + assertNull("Publish enabled for $ Topics by default", message); + connection.disconnect(); + + stopBroker(); + protocolConfig = "transport.publishDollarTopics=true"; + startBroker(); + + mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); + mqtt.setKeepAlive((short) 2); + connection = mqtt.blockingConnection(); + connection.connect(); + + connection.subscribe(new Topic[] { new Topic(DOLLAR_TOPIC, QoS.EXACTLY_ONCE)}); + connection.publish(DOLLAR_TOPIC, DOLLAR_TOPIC.getBytes(), QoS.EXACTLY_ONCE, true); + + message = connection.receive(10, TimeUnit.SECONDS); + assertNotNull(message); + message.ack(); + assertEquals("Message body", DOLLAR_TOPIC, new String(message.getPayload())); + + connection.disconnect(); + } + @Test(timeout = 30 * 10000) public void testJmsMapping() throws Exception { // start up jms consumer