From 6a2e964f5be5cc11a1d032dba1486cad9cb1ec59 Mon Sep 17 00:00:00 2001 From: Andy Date: Thu, 13 Jun 2019 10:43:10 +0100 Subject: [PATCH] ARTEMIS-2386 - use internal session for management queues for QOS2 https://issues.apache.org/jira/browse/ARTEMIS-2386 --- .../protocol/mqtt/MQTTConnectionManager.java | 5 +- .../protocol/mqtt/MQTTPublishManager.java | 8 +- .../core/protocol/mqtt/MQTTSession.java | 14 ++- .../imported/PahoMQTTQOS2SecurityTest.java | 112 ++++++++++++++++++ 4 files changed, 134 insertions(+), 5 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTQOS2SecurityTest.java diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index c24a684c4f..544c5f6d1d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -81,7 +81,10 @@ public class MQTTConnectionManager { session.getConnection().setClientID(clientId); ServerSessionImpl serverSession = createServerSession(username, password); serverSession.start(); - session.setServerSession(serverSession); + ServerSessionImpl internalServerSession = createServerSession(username, password); + internalServerSession.disableSecurity(); + internalServerSession.start(); + session.setServerSession(serverSession, internalServerSession); if (cleanSession) { /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index ffd4eeb733..abcfe3f3d5 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -90,7 +90,7 @@ public class MQTTPublishManager { private void createManagementConsumer() throws Exception { long consumerId = session.getServer().getStorageManager().generateID(); - managementConsumer = session.getServerSession().createConsumer(consumerId, managementAddress, null, false, false, -1); + managementConsumer = session.getInternalServerSession().createConsumer(consumerId, managementAddress, null, false, false, -1); managementConsumer.setStarted(true); } @@ -206,7 +206,8 @@ public class MQTTPublishManager { Pair ref = outboundStore.publishReceived(messageId); if (ref != null) { Message m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId); - session.getServerSession().send(m, true); + //send the management message via the internal server session to bypass security. + session.getInternalServerSession().send(m, true); session.getServerSession().individualAcknowledge(ref.getB(), ref.getA()); } else { session.getProtocolHandler().sendPubRel(messageId); @@ -219,7 +220,8 @@ public class MQTTPublishManager { void handlePubComp(int messageId) throws Exception { Pair ref = session.getState().getOutboundStore().publishComplete(messageId); if (ref != null) { - session.getServerSession().individualAcknowledge(managementConsumer.getID(), ref.getA()); + //ack the message via the internal server session to bypass security. + session.getInternalServerSession().individualAcknowledge(managementConsumer.getID(), ref.getA()); } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index b788f36578..1da256be31 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -37,6 +37,8 @@ public class MQTTSession { private ServerSessionImpl serverSession; + private ServerSessionImpl internalServerSession; + private MQTTPublishManager mqttPublishManager; private MQTTConnectionManager mqttConnectionManager; @@ -97,6 +99,11 @@ public class MQTTSession { serverSession.close(false); } + if (internalServerSession != null) { + internalServerSession.stop(); + internalServerSession.close(false); + } + if (state != null) { state.setAttached(false); } @@ -141,6 +148,10 @@ public class MQTTSession { return serverSession; } + ServerSessionImpl getInternalServerSession() { + return internalServerSession; + } + ActiveMQServer getServer() { return protocolHandler.getServer(); } @@ -157,8 +168,9 @@ public class MQTTSession { return sessionCallback; } - void setServerSession(ServerSessionImpl serverSession) { + void setServerSession(ServerSessionImpl serverSession, ServerSessionImpl internalServerSession) { this.serverSession = serverSession; + this.internalServerSession = internalServerSession; } void setSessionState(MQTTSessionState state) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTQOS2SecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTQOS2SecurityTest.java new file mode 100644 index 0000000000..5e7e3ceb73 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTQOS2SecurityTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.mqtt.imported; + +import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +public class PahoMQTTQOS2SecurityTest extends MQTTTestSupport { + + String user1 = "user1"; + String password1 = "password1"; + + @Override + protected void configureBrokerSecurity(ActiveMQServer server) { + super.configureBrokerSecurity(server); + ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager(); + + // User additions + securityManager.getConfiguration().addUser(user1, password1); + securityManager.getConfiguration().addRole(user1, "addressOnly"); + + // Configure roles + HierarchicalRepository> securityRepository = server.getSecurityRepository(); + HashSet value = new HashSet<>(); + value.add(new Role("addressOnly", true, true, true, true, false, false, false, false, true, true)); + + securityRepository.addMatch(getQueueName(), value); + } + + @Override + public boolean isSecurityEnabled() { + return true; + } + + @Test(timeout = 300000) + public void testSendAndReceiveMQTT() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + + MqttClient consumer = createPahoClient("consumerId"); + MqttClient producer = createPahoClient("producerId"); + MqttConnectOptions conOpt = new MqttConnectOptions(); + conOpt.setCleanSession(true); + conOpt.setUserName(user1); + conOpt.setPassword(password1.toCharArray()); + consumer.connect(conOpt); + consumer.subscribe(getQueueName(), 2); + final boolean[] failed = new boolean[1]; + consumer.setCallback(new MqttCallback() { + + + @Override + public void connectionLost(Throwable cause) { + cause.printStackTrace(); + failed[0] = true; + latch.countDown(); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + latch.countDown(); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + + } + }); + + producer.connect(conOpt); + producer.publish(getQueueName(), "hello".getBytes(), 2, false); + + waitForLatch(latch); + producer.disconnect(); + producer.close(); + Assert.assertFalse(failed[0]); + } + + private MqttClient createPahoClient(String clientId) throws MqttException { + return new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence()); + } + +}