From 6ec7a69d45d2aad2222f444104de2824d83c929f Mon Sep 17 00:00:00 2001 From: Domenico Francesco Bruscino Date: Wed, 14 Apr 2021 10:20:37 +0200 Subject: [PATCH] ARTEMIS-3236 Preserve managed queues on removing MQTT subscription --- .../mqtt/MQTTSubscriptionManager.java | 12 ++++-- .../mqtt/imported/MQTTQueueCleanTest.java | 42 +++++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 160c62303f..c23f9945ec 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -196,6 +196,7 @@ public class MQTTSubscriptionManager { SimpleString internalQueueName = getQueueNameForTopic(internalAddress); session.getSessionState().removeSubscription(address); + Queue queue = session.getServer().locateQueue(internalQueueName); SimpleString sAddress = SimpleString.toSimpleString(internalAddress); AddressInfo addressInfo = session.getServerSession().getAddress(sAddress); if (addressInfo != null && addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) { @@ -207,7 +208,6 @@ public class MQTTSubscriptionManager { } } else { consumers.remove(address); - Queue queue = session.getServer().locateQueue(internalQueueName); Set queueConsumers; if (queue != null && (queueConsumers = (Set) queue.getConsumers()) != null) { for (Consumer consumer : queueConsumers) { @@ -217,8 +217,14 @@ public class MQTTSubscriptionManager { } } - if (session.getServerSession().executeQueueQuery(internalQueueName).isExists()) { - session.getServerSession().deleteQueue(internalQueueName); + if (queue != null) { + assert session.getServerSession().executeQueueQuery(internalQueueName).isExists(); + + if (queue.isConfigurationManaged()) { + queue.deleteAllReferences(); + } else { + session.getServerSession().deleteQueue(internalQueueName); + } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java index cb97e16bdd..cc9f9b26e1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java @@ -17,7 +17,10 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Test; import org.slf4j.Logger; @@ -31,6 +34,45 @@ public class MQTTQueueCleanTest extends MQTTTestSupport { private static final Logger LOG = LoggerFactory.getLogger(MQTTQueueCleanTest.class); + @Test + public void testQueueClean() throws Exception { + testQueueClean(false); + } + + @Test + public void testManagedQueueClean() throws Exception { + testQueueClean(true); + } + + private void testQueueClean(boolean managed) throws Exception { + String address = "clean/test"; + String clientId = "mqtt-client"; + String queueName = "::mqtt-client.clean.test"; + + if (managed) { + server.addAddressInfo(new AddressInfo(address) + .addRoutingType(RoutingType.MULTICAST)); + + server.createQueue(new QueueConfiguration(queueName) + .setAddress(address) + .setRoutingType(RoutingType.MULTICAST) + .setConfigurationManaged(true)); + } + + MQTTClientProvider clientProvider = getMQTTClientProvider(); + clientProvider.setClientId(clientId); + initializeConnection(clientProvider); + clientProvider.subscribe(address, AT_LEAST_ONCE); + clientProvider.disconnect(); + + if (managed) { + assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueName)) != null && + server.locateQueue(SimpleString.toSimpleString(queueName)).getConsumerCount() == 0, 5000, 10)); + } else { + assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueName)) == null, 5000, 10)); + } + } + @Test public void testQueueCleanWhenConnectionSynExeConnectAndDisconnect() throws Exception { Random random = new Random();