diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index bc762e88e9..176a1c18ca 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -108,26 +108,31 @@ public class MQTTSubscriptionManager { Queue q = createQueueForSubscription(coreAddress, getQueueNameForTopic(rawTopicName)); - if (initialStart) { - createConsumerForSubscriptionQueue(q, parsedTopicName, qos, subscription.option().isNoLocal(), null); - } else { - MqttTopicSubscription existingSubscription = session.getState().getSubscription(parsedTopicName); - if (existingSubscription == null) { + try { + if (initialStart) { createConsumerForSubscriptionQueue(q, parsedTopicName, qos, subscription.option().isNoLocal(), null); } else { - Long existingConsumerId = consumers.get(parsedTopicName).getID(); - consumerQoSLevels.put(existingConsumerId, qos); - if (existingSubscription.option().isNoLocal() != subscription.option().isNoLocal()) { - createConsumerForSubscriptionQueue(q, parsedTopicName, qos, subscription.option().isNoLocal(), existingConsumerId); + MqttTopicSubscription existingSubscription = session.getState().getSubscription(parsedTopicName); + if (existingSubscription == null) { + createConsumerForSubscriptionQueue(q, parsedTopicName, qos, subscription.option().isNoLocal(), null); + } else { + Long existingConsumerId = consumers.get(parsedTopicName).getID(); + consumerQoSLevels.put(existingConsumerId, qos); + if (existingSubscription.option().isNoLocal() != subscription.option().isNoLocal()) { + createConsumerForSubscriptionQueue(q, parsedTopicName, qos, subscription.option().isNoLocal(), existingConsumerId); + } } - } - if (subscription.option().retainHandling() == MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE || - (subscription.option().retainHandling() == MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS && existingSubscription == null)) { - session.getRetainMessageManager().addRetainedMessagesToQueue(q, parsedTopicName); - } + if (subscription.option().retainHandling() == MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE || (subscription.option().retainHandling() == MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS && existingSubscription == null)) { + session.getRetainMessageManager().addRetainedMessagesToQueue(q, parsedTopicName); + } - session.getState().addSubscription(subscription, session.getWildcardConfiguration(), subscriptionIdentifier); + session.getState().addSubscription(subscription, session.getWildcardConfiguration(), subscriptionIdentifier); + } + } catch (Exception e) { + // if anything broke during the creation of the consumer (or otherwise) then ensure the subscription queue is removed + q.deleteQueue(); + throw e; } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java index 7ac7639304..85e0e3369a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java @@ -472,4 +472,28 @@ public class MQTT5Test extends MQTT5TestSupport { client.disconnect(); client.close(); } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testQueueCleanedUpOnConsumerFail() throws Exception { + final String topic = getName(); + final String clientID = getName(); + + // force the creation of the consumer to fail + server.getAddressSettingsRepository().addMatch(topic, new AddressSettings().setDefaultMaxConsumers(0)); + + MqttClient client = createPahoClient(clientID); + client.connect(); + try { + client.subscribe(topic, 1); + } catch (Exception e) { + // ignore + } + + Wait.assertTrue(() -> getSubscriptionQueue(topic, clientID) == null, 2000, 100); + + if (client.isConnected()) { + client.disconnect(); + } + client.close(); + } }