diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index ae6b56ce44..49ab5d9196 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -24,28 +24,29 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.FilterConstants; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.utils.CompositeAddress; +import io.netty.handler.codec.mqtt.MqttTopicSubscription; + public class MQTTSubscriptionManager { - private MQTTSession session; + private final MQTTSession session; - private ConcurrentMap consumerQoSLevels; + private final ConcurrentMap consumerQoSLevels; - private ConcurrentMap consumers; + private final ConcurrentMap consumers; // We filter out Artemis management messages and notifications - private SimpleString managementFilter; + private final SimpleString managementFilter; public MQTTSubscriptionManager(MQTTSession session) { this.session = session; @@ -108,7 +109,8 @@ public class MQTTSubscriptionManager { if (!bindingQueryResult.isAutoCreateAddresses()) { throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address)); } - addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address), RoutingType.MULTICAST, false); + addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address), + RoutingType.MULTICAST, true); } return findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index d5978b0156..bfc83e02ff 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -1946,4 +1947,20 @@ public class MQTTTest extends MQTTTestSupport { connection2.disconnect(); } } + + @Test + public void autoDestroyAddress() throws Exception { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAutoDeleteAddresses(true); + server.getAddressSettingsRepository().addMatch("foo.bar", addressSettings); + + final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); + initializeConnection(subscriptionProvider); + subscriptionProvider.subscribe("foo/bar", AT_MOST_ONCE); + assertNotNull(server.getAddressInfo(SimpleString.toSimpleString("foo.bar"))); + + subscriptionProvider.disconnect(); + + assertNull(server.getAddressInfo(SimpleString.toSimpleString("foo.bar"))); + } }