This commit is contained in:
Justin Bertram 2022-08-17 12:29:02 -05:00
commit 05b3879cba
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
4 changed files with 58 additions and 5 deletions

View File

@ -80,4 +80,8 @@ public interface MQTTLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 834006, value = "Failed to publish MQTT message: {0}.", format = Message.Format.MESSAGE_FORMAT)
void failedToPublishMqttMessage(String exceptionMessage, @Cause Throwable t);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 834007, value = "Authorization failure sending will message: {0}", format = Message.Format.MESSAGE_FORMAT)
void authorizationFailureSendingWillMessage(String message);
}

View File

@ -214,11 +214,7 @@ public class MQTTPublishManager {
Transaction tx = session.getServerSession().newTransaction();
try {
if (internal) {
session.getServer().getPostOffice().route(serverMessage, tx, true);
} else {
session.getServerSession().send(tx, serverMessage, true, false);
}
session.getServerSession().send(tx, serverMessage, true, false);
if (message.fixedHeader().isRetain()) {
ByteBuf payload = message.payload();
@ -228,6 +224,9 @@ public class MQTTPublishManager {
tx.commit();
} catch (ActiveMQSecurityException e) {
tx.rollback();
if (internal) {
throw e;
}
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendMessageAck(internal, qos, messageId, MQTTReasonCodes.NOT_AUTHORIZED);
return;

View File

@ -25,6 +25,7 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -277,6 +278,8 @@ public class MQTTSession {
getMqttPublishManager().sendToQueue(publishMessage, true);
state.setWillSent(true);
state.setWillMessage(null);
} catch (ActiveMQSecurityException e) {
MQTTLogger.LOGGER.authorizationFailureSendingWillMessage(e.getMessage());
} catch (Exception e) {
MQTTLogger.LOGGER.errorSendingWillMessage(e);
}

View File

@ -18,13 +18,17 @@
package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.jboss.logging.Logger;
import org.junit.Test;
@ -83,4 +87,47 @@ public class PublishTestsWithSecurity extends MQTT5TestSupport {
client.isConnected();
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testWillAuthorizationSuccess() throws Exception {
internalTestWillAuthorization(fullUser, fullPass, true);
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testWillAuthorizationFailure() throws Exception {
internalTestWillAuthorization(noprivUser, noprivPass, false);
}
private void internalTestWillAuthorization(String username, String password, boolean succeed) throws Exception {
final byte[] WILL = RandomUtil.randomBytes();
final String TOPIC = RandomUtil.randomString();
// consumer of the will message
MqttClient client1 = createPahoClient("willConsumer");
CountDownLatch latch = new CountDownLatch(1);
client1.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
latch.countDown();
}
});
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.username(fullUser)
.password(fullPass.getBytes(StandardCharsets.UTF_8))
.build();
client1.connect(options);
client1.subscribe(TOPIC, 1);
// consumer to generate the will
MqttClient client2 = createPahoClient("willGenerator");
options = new MqttConnectionOptionsBuilder()
.username(username)
.password(password.getBytes(StandardCharsets.UTF_8))
.will(TOPIC, new MqttMessage(WILL))
.build();
client2.connect(options);
client2.disconnectForcibly(0, 0, false);
assertEquals(succeed, latch.await(2, TimeUnit.SECONDS));
}
}