diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index eb9c631b1a..ae0c0edb4d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.jboss.logging.Logger; @@ -68,7 +69,10 @@ public class MQTTPublishManager { } synchronized void stop() throws Exception { - session.getServerSession().removeProducer(session.getServerSession().getName()); + ServerSessionImpl serversession = session.getServerSession(); + if (serversession != null) { + serversession.removeProducer(serversession.getName()); + } if (managementConsumer != null) { managementConsumer.removeItself(); managementConsumer.setStarted(false); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSecurityTest.java new file mode 100644 index 0000000000..fd608ab44c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSecurityTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import java.io.EOFException;
+import java.util.Arrays;
+
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.jboss.logmanager.Level;
+import org.jboss.logmanager.Logger;
+import org.junit.Test;
+
+public class MQTTSecurityTest extends MQTTTestSupport {
+
+ @Override
+ public boolean isSecurityEnabled() {
+ return true;
+ }
+
+ @Test(timeout = 30000)
+ public void testConnection() throws Exception {
+ for (String version : Arrays.asList("3.1", "3.1.1")) {
+
+ BlockingConnection connection = null;
+ try {
+ MQTT mqtt = createMQTTConnection("test-" + version, true);
+ mqtt.setUserName(fullUser);
+ mqtt.setPassword(fullPass);
+ mqtt.setConnectAttemptsMax(1);
+ mqtt.setVersion(version);
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ BlockingConnection finalConnection = connection;
+ assertTrue("Should be connected", Wait.waitFor(() -> finalConnection.isConnected(), 5000, 100));
+ } finally {
+ if (connection != null && connection.isConnected()) connection.disconnect();
+ }
+ }
+ }
+
+ @Test(timeout = 30000, expected = EOFException.class)
+ public void testConnectionWithNullPassword() throws Exception {
+ for (String version : Arrays.asList("3.1", "3.1.1")) {
+
+ BlockingConnection connection = null;
+ try {
+ MQTT mqtt = createMQTTConnection("test-" + version, true);
+ mqtt.setUserName(fullUser);
+ mqtt.setPassword((String) null);
+ mqtt.setConnectAttemptsMax(1);
+ mqtt.setVersion(version);
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ fail("Connect should fail");
+ } finally {
+ if (connection != null && connection.isConnected()) connection.disconnect();
+ }
+ }
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index bac2e37b49..6a22b76a0a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -28,9 +28,7 @@ import java.security.ProtectionDomain;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import io.netty.handler.codec.mqtt.MqttMessage;
@@ -41,10 +39,13 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Tracer;
@@ -78,6 +79,18 @@ public class MQTTTestSupport extends ActiveMQTestBase {
public static final int AT_LEAST_ONCE = 1;
public static final int EXACTLY_ONCE = 2;
+ protected String noprivUser = "noprivs";
+ protected String noprivPass = "noprivs";
+
+ protected String browseUser = "browser";
+ protected String browsePass = "browser";
+
+ protected String guestUser = "guest";
+ protected String guestPass = "guest";
+
+ protected String fullUser = "user";
+ protected String fullPass = "pass";
+
@Rule
public TestName name = new TestName();
@@ -139,10 +152,43 @@ public class MQTTTestSupport extends ActiveMQTestBase {
addressSettings.setMaxSizeBytes(999999999);
addressSettings.setAutoCreateQueues(true);
addressSettings.setAutoCreateAddresses(true);
+ configureBrokerSecurity(server);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
}
+ /**
+ * Copied from org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport#configureBrokerSecurity()
+ */
+ protected void configureBrokerSecurity(ActiveMQServer server) {
+ if (isSecurityEnabled()) {
+ ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
+
+ // User additions
+ securityManager.getConfiguration().addUser(noprivUser, noprivPass);
+ securityManager.getConfiguration().addRole(noprivUser, "nothing");
+ securityManager.getConfiguration().addUser(browseUser, browsePass);
+ securityManager.getConfiguration().addRole(browseUser, "browser");
+ securityManager.getConfiguration().addUser(guestUser, guestPass);
+ securityManager.getConfiguration().addRole(guestUser, "guest");
+ securityManager.getConfiguration().addUser(fullUser, fullPass);
+ securityManager.getConfiguration().addRole(fullUser, "full");
+
+ // Configure roles
+ HierarchicalRepository