diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java index 17a1295eb6..a765de1e78 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java @@ -80,4 +80,8 @@ public interface MQTTLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 834006, value = "Failed to publish MQTT message: {0}.", format = Message.Format.MESSAGE_FORMAT) void failedToPublishMqttMessage(String exceptionMessage, @Cause Throwable t); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 834007, value = "Authorization failure sending will message: {0}", format = Message.Format.MESSAGE_FORMAT) + void authorizationFailureSendingWillMessage(String message); } \ No newline at end of file diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index ee1dfdd211..38d6e4cc44 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -214,11 +214,7 @@ public class MQTTPublishManager { Transaction tx = session.getServerSession().newTransaction(); try { - if (internal) { - session.getServer().getPostOffice().route(serverMessage, tx, true); - } else { - session.getServerSession().send(tx, serverMessage, true, false); - } + session.getServerSession().send(tx, serverMessage, true, false); if (message.fixedHeader().isRetain()) { ByteBuf payload = message.payload(); @@ -228,6 +224,9 @@ public class MQTTPublishManager { tx.commit(); } catch (ActiveMQSecurityException e) { tx.rollback(); + if (internal) { + throw e; + } if (session.getVersion() == MQTTVersion.MQTT_5) { sendMessageAck(internal, qos, messageId, MQTTReasonCodes.NOT_AUTHORIZED); return; diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index d434ca55ba..db4295b100 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -25,6 +25,7 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -277,6 +278,8 @@ public class MQTTSession { getMqttPublishManager().sendToQueue(publishMessage, true); state.setWillSent(true); state.setWillMessage(null); + } catch (ActiveMQSecurityException e) { + MQTTLogger.LOGGER.authorizationFailureSendingWillMessage(e.getMessage()); } catch (Exception e) { MQTTLogger.LOGGER.errorSendingWillMessage(e); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java index ba107493ec..5b902fbcf7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java @@ -18,13 +18,17 @@ package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets; import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes; import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder; import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; import org.jboss.logging.Logger; import org.junit.Test; @@ -83,4 +87,47 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport { client.isConnected(); } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testWillAuthorizationSuccess() throws Exception { + internalTestWillAuthorization(fullUser, fullPass, true); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testWillAuthorizationFailure() throws Exception { + internalTestWillAuthorization(noprivUser, noprivPass, false); + } + + private void internalTestWillAuthorization(String username, String password, boolean succeed) throws Exception { + final byte[] WILL = RandomUtil.randomBytes(); + final String TOPIC = RandomUtil.randomString(); + + // consumer of the will message + MqttClient client1 = createPahoClient("willConsumer"); + CountDownLatch latch = new CountDownLatch(1); + client1.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) { + latch.countDown(); + } + }); + MqttConnectionOptions options = new MqttConnectionOptionsBuilder() + .username(fullUser) + .password(fullPass.getBytes(StandardCharsets.UTF_8)) + .build(); + client1.connect(options); + client1.subscribe(TOPIC, 1); + + // consumer to generate the will + MqttClient client2 = createPahoClient("willGenerator"); + options = new MqttConnectionOptionsBuilder() + .username(username) + .password(password.getBytes(StandardCharsets.UTF_8)) + .will(TOPIC, new MqttMessage(WILL)) + .build(); + client2.connect(options); + client2.disconnectForcibly(0, 0, false); + + assertEquals(succeed, latch.await(2, TimeUnit.SECONDS)); + } }