diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 8e83ed200e..97a74a9716 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -91,6 +91,9 @@ public class MQTTProtocolConverter { public static final int V3_1 = 3; public static final int V3_1_1 = 4; + public static final String SINGLE_LEVEL_WILDCARD = "+"; + public static final String MULTI_LEVEL_WILDCARD = "#"; + private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5; @@ -458,6 +461,12 @@ public class MQTTProtocolConverter { checkConnected(); LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}", command.messageId(), clientId, connectionInfo.getConnectionId()); + //Both version 3.1 and 3.1.1 do not allow the topic name to contain a wildcard in the publish packet + if (containsMqttWildcard(command.topicName().toString())) { + // [MQTT-3.3.2-2]: The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters + getMQTTTransport().onException(IOExceptionSupport.create("The topic name must not contain wildcard characters.", null)); + return; + } ActiveMQMessage message = convertMessage(command); message.setProducerId(producerId); message.onSend(); @@ -820,6 +829,11 @@ public class MQTTProtocolConverter { return clientId; } + protected boolean containsMqttWildcard(String value) { + return value != null && (value.contains(SINGLE_LEVEL_WILDCARD) || + value.contains(MULTI_LEVEL_WILDCARD)); + } + protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException { if (subsciptionStrategy == null) { synchronized (STRATAGY_FINDER) { diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 3dd3348e77..a6e72e77b5 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -1163,6 +1163,37 @@ public class MQTTTest extends MQTTTestSupport { connection.disconnect(); } + @Test(timeout = 60 * 1000) + public void testPublishWildcard31() throws Exception { + testPublishWildcard("3.1"); + } + + @Test(timeout = 60 * 1000) + public void testPublishWildcard311() throws Exception { + testPublishWildcard("3.1.1"); + } + + private void testPublishWildcard(String version) throws Exception { + MQTT mqttPub = createMQTTConnection("MQTTPub-Client", true); + mqttPub.setVersion(version); + BlockingConnection blockingConnection = mqttPub.blockingConnection(); + blockingConnection.connect(); + String payload = "Test Message"; + try { + blockingConnection.publish("foo/#", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + fail("Should not be able to publish with wildcard in topic."); + } catch (Exception ex) { + LOG.info("Exception expected on publish with wildcard in topic name"); + } + try { + blockingConnection.publish("foo/+", payload.getBytes(), QoS.AT_LEAST_ONCE, false); + fail("Should not be able to publish with wildcard in topic."); + } catch (Exception ex) { + LOG.info("Exception expected on publish with wildcard in topic name"); + } + blockingConnection.disconnect(); + } + @Test(timeout = 60 * 1000) public void testDuplicateClientId() throws Exception { // test link stealing enabled by default