ARTEMIS-4126 address not auto-created when sending MQTT msg
This commit is contained in:
parent
114302a093
commit
e531c61c95
|
@ -199,8 +199,9 @@ public class MQTTPublishManager {
|
|||
topic = message.variableHeader().topicName();
|
||||
}
|
||||
}
|
||||
|
||||
Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, message);
|
||||
String coreAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(topic, session.getWildcardConfiguration());
|
||||
SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
|
||||
Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, address, message);
|
||||
int qos = message.fixedHeader().qosLevel().value();
|
||||
if (qos > 0) {
|
||||
serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
|
||||
|
@ -213,6 +214,9 @@ public class MQTTPublishManager {
|
|||
|
||||
Transaction tx = session.getServerSession().newTransaction();
|
||||
try {
|
||||
if (session.getServer().getAddressInfo(address) == null && session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses()) {
|
||||
session.getServerSession().createAddress(address, RoutingType.MULTICAST, true);
|
||||
}
|
||||
session.getServerSession().send(tx, serverMessage, true, false);
|
||||
|
||||
if (message.fixedHeader().isRetain()) {
|
||||
|
|
|
@ -233,10 +233,8 @@ public class MQTTUtil {
|
|||
}
|
||||
|
||||
public static Message createServerMessageFromByteBuf(MQTTSession session,
|
||||
String topic,
|
||||
SimpleString address,
|
||||
MqttPublishMessage mqttPublishMessage) {
|
||||
String coreAddress = convertMqttTopicFilterToCoreAddress(topic, session.getWildcardConfiguration());
|
||||
SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
|
||||
ICoreMessage message = createServerMessage(session, address, mqttPublishMessage);
|
||||
|
||||
ByteBuf payload = mqttPublishMessage.payload();
|
||||
|
|
|
@ -80,6 +80,34 @@ public class MQTT5Test extends MQTT5TestSupport {
|
|||
context.close();
|
||||
}
|
||||
|
||||
@Test(timeout = DEFAULT_TIMEOUT)
|
||||
public void testAddressAutoCreation() throws Exception {
|
||||
final String DESTINATION = RandomUtil.randomString();
|
||||
server.getAddressSettingsRepository().addMatch(DESTINATION, new AddressSettings().setAutoCreateAddresses(true));
|
||||
|
||||
MqttClient producer = createPahoClient(RandomUtil.randomString());
|
||||
producer.connect();
|
||||
producer.publish(DESTINATION, new byte[0], 0, false);
|
||||
producer.disconnect();
|
||||
producer.close();
|
||||
|
||||
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(DESTINATION)) != null, 2000, 100);
|
||||
}
|
||||
|
||||
@Test(timeout = DEFAULT_TIMEOUT)
|
||||
public void testAddressAutoCreationNegative() throws Exception {
|
||||
final String DESTINATION = RandomUtil.randomString();
|
||||
server.getAddressSettingsRepository().addMatch(DESTINATION, new AddressSettings().setAutoCreateAddresses(false));
|
||||
|
||||
MqttClient producer = createPahoClient(RandomUtil.randomString());
|
||||
producer.connect();
|
||||
producer.publish(DESTINATION, new byte[0], 0, false);
|
||||
producer.disconnect();
|
||||
producer.close();
|
||||
|
||||
assertTrue(server.getAddressInfo(SimpleString.toSimpleString(DESTINATION)) == null);
|
||||
}
|
||||
|
||||
/*
|
||||
* Trying to reproduce error from https://issues.apache.org/jira/browse/ARTEMIS-1184
|
||||
*/
|
||||
|
|
|
@ -130,6 +130,9 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
|
|||
protected String noprivUser = "noprivs";
|
||||
protected String noprivPass = "noprivs";
|
||||
|
||||
protected String createAddressUser = "createAddress";
|
||||
protected String createAddressPass = "createAddress";
|
||||
|
||||
protected String browseUser = "browser";
|
||||
protected String browsePass = "browser";
|
||||
|
||||
|
@ -201,6 +204,8 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
|
|||
// User additions
|
||||
securityManager.getConfiguration().addUser(noprivUser, noprivPass);
|
||||
securityManager.getConfiguration().addRole(noprivUser, "nothing");
|
||||
securityManager.getConfiguration().addUser(createAddressUser, createAddressPass);
|
||||
securityManager.getConfiguration().addRole(createAddressUser, "createAddress");
|
||||
securityManager.getConfiguration().addUser(browseUser, browsePass);
|
||||
securityManager.getConfiguration().addRole(browseUser, "browser");
|
||||
securityManager.getConfiguration().addUser(guestUser, guestPass);
|
||||
|
@ -215,6 +220,7 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
|
|||
value.add(new Role("browser", false, false, false, false, false, false, false, true, false, false));
|
||||
value.add(new Role("guest", false, true, false, false, false, false, false, true, false, false));
|
||||
value.add(new Role("full", true, true, true, true, true, true, true, true, true, true));
|
||||
value.add(new Role("createAddress", false, false, false, false, false, false, false, false, true, false));
|
||||
securityRepository.addMatch("#", value);
|
||||
|
||||
server.getConfiguration().setSecurityEnabled(true);
|
||||
|
|
|
@ -20,9 +20,14 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
|
||||
import org.apache.activemq.artemis.core.security.CheckType;
|
||||
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
|
||||
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.eclipse.paho.mqttv5.client.MqttClient;
|
||||
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
|
||||
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
|
||||
|
@ -42,8 +47,9 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport {
|
|||
}
|
||||
|
||||
@Test(timeout = DEFAULT_TIMEOUT)
|
||||
public void testAuthorizationFailure() throws Exception {
|
||||
public void testCreateAddressAuthorizationFailure() throws Exception {
|
||||
final String CLIENT_ID = "publisher";
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
|
||||
.username(noprivUser)
|
||||
.password(noprivPass.getBytes(StandardCharsets.UTF_8))
|
||||
|
@ -51,6 +57,12 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport {
|
|||
MqttClient client = createPahoClient(CLIENT_ID);
|
||||
client.connect(options);
|
||||
|
||||
server.getManagementService().addNotificationListener(notification -> {
|
||||
if (notification.getType() == CoreNotificationType.SECURITY_PERMISSION_VIOLATION && CheckType.valueOf(notification.getProperties().getSimpleStringProperty(ManagementHelper.HDR_CHECK_TYPE).toString()) == CheckType.CREATE_ADDRESS) {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
client.publish("/foo", new byte[0], 2, false);
|
||||
fail("Publishing should have failed with a security problem");
|
||||
|
@ -60,9 +72,44 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport {
|
|||
fail("Should have thrown an MqttException");
|
||||
}
|
||||
|
||||
assertTrue(latch.await(2, TimeUnit.SECONDS));
|
||||
|
||||
assertFalse(client.isConnected());
|
||||
}
|
||||
|
||||
@Test(timeout = DEFAULT_TIMEOUT)
|
||||
public void testSendAuthorizationFailure() throws Exception {
|
||||
final String CLIENT_ID = "publisher";
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
|
||||
.username(createAddressUser)
|
||||
.password(createAddressPass.getBytes(StandardCharsets.UTF_8))
|
||||
.build();
|
||||
MqttClient client = createPahoClient(CLIENT_ID);
|
||||
client.connect(options);
|
||||
|
||||
server.getManagementService().addNotificationListener(notification -> {
|
||||
if (notification.getType() == CoreNotificationType.SECURITY_PERMISSION_VIOLATION && CheckType.valueOf(notification.getProperties().getSimpleStringProperty(ManagementHelper.HDR_CHECK_TYPE).toString()) == CheckType.SEND) {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
client.publish("/foo", new byte[0], 2, false);
|
||||
fail("Publishing should have failed with a security problem");
|
||||
} catch (MqttException e) {
|
||||
assertEquals(MQTTReasonCodes.NOT_AUTHORIZED, (byte) e.getReasonCode());
|
||||
} catch (Exception e) {
|
||||
fail("Should have thrown an MqttException");
|
||||
}
|
||||
|
||||
assertTrue(latch.await(2, TimeUnit.SECONDS));
|
||||
|
||||
assertFalse(client.isConnected());
|
||||
|
||||
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(".foo")) != null, 2000, 100);
|
||||
}
|
||||
|
||||
@Test(timeout = DEFAULT_TIMEOUT)
|
||||
public void testAuthorizationSuccess() throws Exception {
|
||||
final String CLIENT_ID = "publisher";
|
||||
|
|
Loading…
Reference in New Issue