mirror of https://github.com/apache/activemq.git
Fail to publish if the topic name in the publish packet contains a wild card character as per the MQTT V3.1 and V3.1.1 spec.
This commit is contained in:
parent
012e4d0a1c
commit
1b38b27ed2
|
@ -91,6 +91,9 @@ public class MQTTProtocolConverter {
|
||||||
public static final int V3_1 = 3;
|
public static final int V3_1 = 3;
|
||||||
public static final int V3_1_1 = 4;
|
public static final int V3_1_1 = 4;
|
||||||
|
|
||||||
|
public static final String SINGLE_LEVEL_WILDCARD = "+";
|
||||||
|
public static final String MULTI_LEVEL_WILDCARD = "#";
|
||||||
|
|
||||||
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
|
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
|
||||||
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
|
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
|
||||||
private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5;
|
private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5;
|
||||||
|
@ -458,6 +461,12 @@ public class MQTTProtocolConverter {
|
||||||
checkConnected();
|
checkConnected();
|
||||||
LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}",
|
LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}",
|
||||||
command.messageId(), clientId, connectionInfo.getConnectionId());
|
command.messageId(), clientId, connectionInfo.getConnectionId());
|
||||||
|
//Both version 3.1 and 3.1.1 do not allow the topic name to contain a wildcard in the publish packet
|
||||||
|
if (containsMqttWildcard(command.topicName().toString())) {
|
||||||
|
// [MQTT-3.3.2-2]: The Topic Name in the PUBLISH Packet MUST NOT contain wildcard characters
|
||||||
|
getMQTTTransport().onException(IOExceptionSupport.create("The topic name must not contain wildcard characters.", null));
|
||||||
|
return;
|
||||||
|
}
|
||||||
ActiveMQMessage message = convertMessage(command);
|
ActiveMQMessage message = convertMessage(command);
|
||||||
message.setProducerId(producerId);
|
message.setProducerId(producerId);
|
||||||
message.onSend();
|
message.onSend();
|
||||||
|
@ -820,6 +829,11 @@ public class MQTTProtocolConverter {
|
||||||
return clientId;
|
return clientId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean containsMqttWildcard(String value) {
|
||||||
|
return value != null && (value.contains(SINGLE_LEVEL_WILDCARD) ||
|
||||||
|
value.contains(MULTI_LEVEL_WILDCARD));
|
||||||
|
}
|
||||||
|
|
||||||
protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException {
|
protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException {
|
||||||
if (subsciptionStrategy == null) {
|
if (subsciptionStrategy == null) {
|
||||||
synchronized (STRATAGY_FINDER) {
|
synchronized (STRATAGY_FINDER) {
|
||||||
|
|
|
@ -1163,6 +1163,37 @@ public class MQTTTest extends MQTTTestSupport {
|
||||||
connection.disconnect();
|
connection.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60 * 1000)
|
||||||
|
public void testPublishWildcard31() throws Exception {
|
||||||
|
testPublishWildcard("3.1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60 * 1000)
|
||||||
|
public void testPublishWildcard311() throws Exception {
|
||||||
|
testPublishWildcard("3.1.1");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testPublishWildcard(String version) throws Exception {
|
||||||
|
MQTT mqttPub = createMQTTConnection("MQTTPub-Client", true);
|
||||||
|
mqttPub.setVersion(version);
|
||||||
|
BlockingConnection blockingConnection = mqttPub.blockingConnection();
|
||||||
|
blockingConnection.connect();
|
||||||
|
String payload = "Test Message";
|
||||||
|
try {
|
||||||
|
blockingConnection.publish("foo/#", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
|
||||||
|
fail("Should not be able to publish with wildcard in topic.");
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.info("Exception expected on publish with wildcard in topic name");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
blockingConnection.publish("foo/+", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
|
||||||
|
fail("Should not be able to publish with wildcard in topic.");
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.info("Exception expected on publish with wildcard in topic name");
|
||||||
|
}
|
||||||
|
blockingConnection.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
public void testDuplicateClientId() throws Exception {
|
public void testDuplicateClientId() throws Exception {
|
||||||
// test link stealing enabled by default
|
// test link stealing enabled by default
|
||||||
|
|
Loading…
Reference in New Issue