ARTEMIS-3942 use session instead of direct routing for MQTT LWT messages

Using direct routing skips authorization for "Last Will and Testament"
messages (a.k.a. "will" messages). This commit fixes that problem by
using the internal session that is established for normal message
production and consumption.
This commit is contained in:
Justin Bertram 2022-08-17 11:33:15 -05:00
parent aa11a82e0b
commit 682f505e32
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
4 changed files with 58 additions and 5 deletions

View File

@ -80,4 +80,8 @@ public interface MQTTLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 834006, value = "Failed to publish MQTT message: {0}.", format = Message.Format.MESSAGE_FORMAT)
void failedToPublishMqttMessage(String exceptionMessage, @Cause Throwable t);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 834007, value = "Authorization failure sending will message: {0}", format = Message.Format.MESSAGE_FORMAT)
void authorizationFailureSendingWillMessage(String message);
}

View File

@ -214,11 +214,7 @@ public class MQTTPublishManager {
Transaction tx = session.getServerSession().newTransaction();
try {
if (internal) {
session.getServer().getPostOffice().route(serverMessage, tx, true);
} else {
session.getServerSession().send(tx, serverMessage, true, false);
}
if (message.fixedHeader().isRetain()) {
ByteBuf payload = message.payload();
@ -228,6 +224,9 @@ public class MQTTPublishManager {
tx.commit();
} catch (ActiveMQSecurityException e) {
tx.rollback();
if (internal) {
throw e;
}
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendMessageAck(internal, qos, messageId, MQTTReasonCodes.NOT_AUTHORIZED);
return;

View File

@ -25,6 +25,7 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -277,6 +278,8 @@ public class MQTTSession {
getMqttPublishManager().sendToQueue(publishMessage, true);
state.setWillSent(true);
state.setWillMessage(null);
} catch (ActiveMQSecurityException e) {
MQTTLogger.LOGGER.authorizationFailureSendingWillMessage(e.getMessage());
} catch (Exception e) {
MQTTLogger.LOGGER.errorSendingWillMessage(e);
}

View File

@ -18,13 +18,17 @@
package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.jboss.logging.Logger;
import org.junit.Test;
@ -83,4 +87,47 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport {
client.isConnected();
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testWillAuthorizationSuccess() throws Exception {
internalTestWillAuthorization(fullUser, fullPass, true);
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testWillAuthorizationFailure() throws Exception {
internalTestWillAuthorization(noprivUser, noprivPass, false);
}
private void internalTestWillAuthorization(String username, String password, boolean succeed) throws Exception {
final byte[] WILL = RandomUtil.randomBytes();
final String TOPIC = RandomUtil.randomString();
// consumer of the will message
MqttClient client1 = createPahoClient("willConsumer");
CountDownLatch latch = new CountDownLatch(1);
client1.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
latch.countDown();
}
});
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.username(fullUser)
.password(fullPass.getBytes(StandardCharsets.UTF_8))
.build();
client1.connect(options);
client1.subscribe(TOPIC, 1);
// consumer to generate the will
MqttClient client2 = createPahoClient("willGenerator");
options = new MqttConnectionOptionsBuilder()
.username(username)
.password(password.getBytes(StandardCharsets.UTF_8))
.will(TOPIC, new MqttMessage(WILL))
.build();
client2.connect(options);
client2.disconnectForcibly(0, 0, false);
assertEquals(succeed, latch.await(2, TimeUnit.SECONDS));
}
}