diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index eb9c631b1a..ae0c0edb4d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.jboss.logging.Logger; @@ -68,7 +69,10 @@ public class MQTTPublishManager { } synchronized void stop() throws Exception { - session.getServerSession().removeProducer(session.getServerSession().getName()); + ServerSessionImpl serversession = session.getServerSession(); + if (serversession != null) { + serversession.removeProducer(serversession.getName()); + } if (managementConsumer != null) { managementConsumer.removeItself(); managementConsumer.setStarted(false); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSecurityTest.java new file mode 100644 index 0000000000..fd608ab44c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTSecurityTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.mqtt.imported; + +import java.io.EOFException; +import java.util.Arrays; + +import org.apache.activemq.artemis.tests.util.Wait; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.jboss.logmanager.Level; +import org.jboss.logmanager.Logger; +import org.junit.Test; + +public class MQTTSecurityTest extends MQTTTestSupport { + + @Override + public boolean isSecurityEnabled() { + return true; + } + + @Test(timeout = 30000) + public void testConnection() throws Exception { + for (String version : Arrays.asList("3.1", "3.1.1")) { + + BlockingConnection connection = null; + try { + MQTT mqtt = createMQTTConnection("test-" + version, true); + mqtt.setUserName(fullUser); + mqtt.setPassword(fullPass); + mqtt.setConnectAttemptsMax(1); + mqtt.setVersion(version); + connection = mqtt.blockingConnection(); + connection.connect(); + BlockingConnection finalConnection = connection; + assertTrue("Should be connected", Wait.waitFor(() -> finalConnection.isConnected(), 5000, 100)); + } finally { + if (connection != null && connection.isConnected()) connection.disconnect(); + } + } + } + + @Test(timeout = 30000, expected = EOFException.class) + public void testConnectionWithNullPassword() throws Exception { + for (String version : Arrays.asList("3.1", "3.1.1")) { + + BlockingConnection connection = null; + try { + MQTT mqtt = createMQTTConnection("test-" + version, true); + mqtt.setUserName(fullUser); + mqtt.setPassword((String) null); + mqtt.setConnectAttemptsMax(1); + mqtt.setVersion(version); + connection = mqtt.blockingConnection(); + connection.connect(); + fail("Connect should fail"); + } finally { + if (connection != null && connection.isConnected()) connection.disconnect(); + } + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index bac2e37b49..6a22b76a0a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -28,9 +28,7 @@ import java.security.ProtectionDomain; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; import io.netty.handler.codec.mqtt.MqttMessage; @@ -41,10 +39,13 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Tracer; @@ -78,6 +79,18 @@ public class MQTTTestSupport extends ActiveMQTestBase { public static final int AT_LEAST_ONCE = 1; public static final int EXACTLY_ONCE = 2; + protected String noprivUser = "noprivs"; + protected String noprivPass = "noprivs"; + + protected String browseUser = "browser"; + protected String browsePass = "browser"; + + protected String guestUser = "guest"; + protected String guestPass = "guest"; + + protected String fullUser = "user"; + protected String fullPass = "pass"; + @Rule public TestName name = new TestName(); @@ -139,10 +152,43 @@ public class MQTTTestSupport extends ActiveMQTestBase { addressSettings.setMaxSizeBytes(999999999); addressSettings.setAutoCreateQueues(true); addressSettings.setAutoCreateAddresses(true); + configureBrokerSecurity(server); server.getAddressSettingsRepository().addMatch("#", addressSettings); } + /** + * Copied from org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport#configureBrokerSecurity() + */ + protected void configureBrokerSecurity(ActiveMQServer server) { + if (isSecurityEnabled()) { + ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager(); + + // User additions + securityManager.getConfiguration().addUser(noprivUser, noprivPass); + securityManager.getConfiguration().addRole(noprivUser, "nothing"); + securityManager.getConfiguration().addUser(browseUser, browsePass); + securityManager.getConfiguration().addRole(browseUser, "browser"); + securityManager.getConfiguration().addUser(guestUser, guestPass); + securityManager.getConfiguration().addRole(guestUser, "guest"); + securityManager.getConfiguration().addUser(fullUser, fullPass); + securityManager.getConfiguration().addRole(fullUser, "full"); + + // Configure roles + HierarchicalRepository> securityRepository = server.getSecurityRepository(); + HashSet value = new HashSet<>(); + value.add(new Role("nothing", false, false, false, false, false, false, false, false)); + value.add(new Role("browser", false, false, false, false, false, false, false, true)); + value.add(new Role("guest", false, true, false, false, false, false, false, true)); + value.add(new Role("full", true, true, true, true, true, true, true, true)); + securityRepository.addMatch(getQueueName(), value); + + server.getConfiguration().setSecurityEnabled(true); + } else { + server.getConfiguration().setSecurityEnabled(false); + } + } + public void startBroker() throws Exception { configureBroker(); server.start(); @@ -252,6 +298,10 @@ public class MQTTTestSupport extends ActiveMQTestBase { return false; } + public boolean isSecurityEnabled() { + return false; + } + protected interface Task { void run() throws Exception;