ARTEMIS-4137 MQTT sub-queue clean-up can fail due to auth

This commit is contained in:
Justin Bertram 2023-01-19 13:25:43 -06:00 committed by Bruscino Domenico Francesco
parent 3ef2fbe93a
commit b0ba8cae24
7 changed files with 49 additions and 10 deletions

View File

@ -83,7 +83,7 @@ public class MQTTConnectionManager {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
session.clean();
session.clean(true);
session.setClean(true);
}

View File

@ -132,7 +132,7 @@ public class MQTTSession {
// If the session expires the will message must be sent no matter the will delay
sendWillMessage();
}
clean();
clean(false);
protocolManager.removeSessionState(connection.getClientID());
} else {
state.setDisconnectedTime(System.currentTimeMillis());
@ -142,7 +142,7 @@ public class MQTTSession {
sendWillMessage();
}
if (isClean()) {
clean();
clean(false);
protocolManager.removeSessionState(connection.getClientID());
}
}
@ -222,8 +222,8 @@ public class MQTTSession {
return protocolManager;
}
void clean() throws Exception {
subscriptionManager.clean();
void clean(boolean enforceSecurity) throws Exception {
subscriptionManager.clean(enforceSecurity);
mqttPublishManager.clean();
state.clear();
}

View File

@ -252,6 +252,10 @@ public class MQTTSubscriptionManager {
}
private short removeSubscription(String address) {
return removeSubscription(address, true);
}
private short removeSubscription(String address, boolean enforceSecurity) {
if (session.getState().getSubscription(address) == null) {
return MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED;
}
@ -290,7 +294,7 @@ public class MQTTSubscriptionManager {
if (queue.isConfigurationManaged()) {
queue.deleteAllReferences();
} else {
session.getServerSession().deleteQueue(internalQueueName);
session.getServerSession().deleteQueue(internalQueueName, enforceSecurity);
}
}
} catch (Exception e) {
@ -367,9 +371,9 @@ public class MQTTSubscriptionManager {
return consumerQoSLevels;
}
void clean() {
void clean(boolean enforceSecurity) {
for (MqttTopicSubscription mqttTopicSubscription : session.getState().getSubscriptions()) {
removeSubscription(mqttTopicSubscription.topicName());
removeSubscription(mqttTopicSubscription.topicName(), enforceSecurity);
}
}
}

View File

@ -308,6 +308,8 @@ public interface ServerSession extends SecurityAuth {
void deleteQueue(SimpleString name) throws Exception;
void deleteQueue(SimpleString name, boolean enforceSecurity) throws Exception;
ServerConsumer createConsumer(long consumerID,
SimpleString queueName,
SimpleString filterString,

View File

@ -1167,6 +1167,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void deleteQueue(final SimpleString queueToDelete) throws Exception {
deleteQueue(queueToDelete, true);
}
@Override
public void deleteQueue(final SimpleString queueToDelete, boolean enforceSecurity) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.destroyQueue(this, remotingConnection.getSubject(), remotingConnection.getRemoteAddress(), queueToDelete);
}
@ -1178,7 +1183,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
throw new ActiveMQNonExistentQueueException();
}
server.destroyQueue(unPrefixedQueueName, this, true, false, true);
server.destroyQueue(unPrefixedQueueName, enforceSecurity ? this : null, true, false, true);
TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(unPrefixedQueueName);

View File

@ -142,6 +142,9 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
protected String fullUser = "user";
protected String fullPass = "pass";
protected String noDeleteUser = "noDelete";
protected String noDeletePass = "noDelete";
@Rule
public TestName name = new TestName();
@ -212,6 +215,8 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
securityManager.getConfiguration().addRole(guestUser, "guest");
securityManager.getConfiguration().addUser(fullUser, fullPass);
securityManager.getConfiguration().addRole(fullUser, "full");
securityManager.getConfiguration().addUser(noDeleteUser, noDeleteUser);
securityManager.getConfiguration().addRole(noDeleteUser, "noDelete");
// Configure roles
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
@ -221,6 +226,7 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
value.add(new Role("guest", false, true, false, false, false, false, false, true, false, false));
value.add(new Role("full", true, true, true, true, true, true, true, true, true, true));
value.add(new Role("createAddress", false, false, false, false, false, false, false, false, true, false));
value.add(new Role("noDelete", true, true, true, false, true, false, true, true, true, true));
securityRepository.addMatch("#", value);
server.getConfiguration().setSecurityEnabled(true);
@ -344,7 +350,12 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
protected Queue getSubscriptionQueue(String TOPIC) {
try {
return ((LocalQueueBinding)server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(TOPIC)).getBindings().toArray()[0]).getQueue();
Object[] array = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(TOPIC)).getBindings().toArray();
if (array.length == 0) {
return null;
} else {
return ((LocalQueueBinding)array[0]).getQueue();
}
} catch (Exception e) {
e.printStackTrace();
return null;

View File

@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
@ -92,4 +93,20 @@ public class SubscribeTestsWithSecurity extends MQTT5TestSupport {
client.disconnect();
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testSubscriptionQueueRemoved() throws Exception {
final String CLIENT_ID = "consumer";
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.username(noDeleteUser)
.password(noDeletePass.getBytes(StandardCharsets.UTF_8))
.build();
MqttClient client = createPahoClient(CLIENT_ID);
client.connect(options);
client.subscribe(getTopicName(), 0).waitForCompletion();
client.disconnect();
Wait.assertTrue(() -> getSubscriptionQueue(getTopicName()) == null, 2000, 100);
}
}