From 682f505e32f9b6472665212acd6f58c32c7bf98d Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Wed, 17 Aug 2022 11:33:15 -0500 Subject: [PATCH] ARTEMIS-3942 use session instead of direct routing for MQTT LWT messages Using direct routing skips authorization for "Last Will and Testament" messages (a.k.a. "will" messages). This commit fixes that problem by using the internal session that is established for normal message production and consumption. --- .../core/protocol/mqtt/MQTTLogger.java | 4 ++ .../protocol/mqtt/MQTTPublishManager.java | 9 ++-- .../core/protocol/mqtt/MQTTSession.java | 3 ++ .../PublishTestsWithSecurity.java | 47 +++++++++++++++++++ 4 files changed, 58 insertions(+), 5 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java index 17a1295eb6..a765de1e78 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java @@ -80,4 +80,8 @@ public interface MQTTLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 834006, value = "Failed to publish MQTT message: {0}.", format = Message.Format.MESSAGE_FORMAT) void failedToPublishMqttMessage(String exceptionMessage, @Cause Throwable t); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 834007, value = "Authorization failure sending will message: {0}", format = Message.Format.MESSAGE_FORMAT) + void authorizationFailureSendingWillMessage(String message); } \ No newline at end of file diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index ee1dfdd211..38d6e4cc44 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -214,11 +214,7 @@ public class MQTTPublishManager { Transaction tx = session.getServerSession().newTransaction(); try { - if (internal) { - session.getServer().getPostOffice().route(serverMessage, tx, true); - } else { - session.getServerSession().send(tx, serverMessage, true, false); - } + session.getServerSession().send(tx, serverMessage, true, false); if (message.fixedHeader().isRetain()) { ByteBuf payload = message.payload(); @@ -228,6 +224,9 @@ public class MQTTPublishManager { tx.commit(); } catch (ActiveMQSecurityException e) { tx.rollback(); + if (internal) { + throw e; + } if (session.getVersion() == MQTTVersion.MQTT_5) { sendMessageAck(internal, qos, messageId, MQTTReasonCodes.NOT_AUTHORIZED); return; diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index d434ca55ba..db4295b100 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -25,6 +25,7 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -277,6 +278,8 @@ public class MQTTSession { getMqttPublishManager().sendToQueue(publishMessage, true); state.setWillSent(true); state.setWillMessage(null); + } catch (ActiveMQSecurityException e) { + MQTTLogger.LOGGER.authorizationFailureSendingWillMessage(e.getMessage()); } catch (Exception e) { MQTTLogger.LOGGER.errorSendingWillMessage(e); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java index ba107493ec..5b902fbcf7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java @@ -18,13 +18,17 @@ package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets; import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes; import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder; import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; import org.jboss.logging.Logger; import org.junit.Test; @@ -83,4 +87,47 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport { client.isConnected(); } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testWillAuthorizationSuccess() throws Exception { + internalTestWillAuthorization(fullUser, fullPass, true); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testWillAuthorizationFailure() throws Exception { + internalTestWillAuthorization(noprivUser, noprivPass, false); + } + + private void internalTestWillAuthorization(String username, String password, boolean succeed) throws Exception { + final byte[] WILL = RandomUtil.randomBytes(); + final String TOPIC = RandomUtil.randomString(); + + // consumer of the will message + MqttClient client1 = createPahoClient("willConsumer"); + CountDownLatch latch = new CountDownLatch(1); + client1.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) { + latch.countDown(); + } + }); + MqttConnectionOptions options = new MqttConnectionOptionsBuilder() + .username(fullUser) + .password(fullPass.getBytes(StandardCharsets.UTF_8)) + .build(); + client1.connect(options); + client1.subscribe(TOPIC, 1); + + // consumer to generate the will + MqttClient client2 = createPahoClient("willGenerator"); + options = new MqttConnectionOptionsBuilder() + .username(username) + .password(password.getBytes(StandardCharsets.UTF_8)) + .will(TOPIC, new MqttMessage(WILL)) + .build(); + client2.connect(options); + client2.disconnectForcibly(0, 0, false); + + assertEquals(succeed, latch.await(2, TimeUnit.SECONDS)); + } }