This commit is contained in:
Justin Bertram 2021-05-25 11:24:54 -05:00
commit 67949b9484
2 changed files with 51 additions and 3 deletions

View File

@ -196,6 +196,7 @@ public class MQTTSubscriptionManager {
SimpleString internalQueueName = getQueueNameForTopic(internalAddress);
session.getSessionState().removeSubscription(address);
Queue queue = session.getServer().locateQueue(internalQueueName);
SimpleString sAddress = SimpleString.toSimpleString(internalAddress);
AddressInfo addressInfo = session.getServerSession().getAddress(sAddress);
if (addressInfo != null && addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
@ -207,7 +208,6 @@ public class MQTTSubscriptionManager {
}
} else {
consumers.remove(address);
Queue queue = session.getServer().locateQueue(internalQueueName);
Set<Consumer> queueConsumers;
if (queue != null && (queueConsumers = (Set<Consumer>) queue.getConsumers()) != null) {
for (Consumer consumer : queueConsumers) {
@ -217,10 +217,16 @@ public class MQTTSubscriptionManager {
}
}
if (session.getServerSession().executeQueueQuery(internalQueueName).isExists()) {
if (queue != null) {
assert session.getServerSession().executeQueueQuery(internalQueueName).isExists();
if (queue.isConfigurationManaged()) {
queue.deleteAllReferences();
} else {
session.getServerSession().deleteQueue(internalQueueName);
}
}
}
private SimpleString getQueueNameForTopic(String topic) {
return new SimpleString(session.getSessionState().getClientId() + "." + topic);

View File

@ -17,7 +17,10 @@
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
@ -31,6 +34,45 @@ public class MQTTQueueCleanTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTQueueCleanTest.class);
@Test
public void testQueueClean() throws Exception {
testQueueClean(false);
}
@Test
public void testManagedQueueClean() throws Exception {
testQueueClean(true);
}
private void testQueueClean(boolean managed) throws Exception {
String address = "clean/test";
String clientId = "mqtt-client";
String queueName = "::mqtt-client.clean.test";
if (managed) {
server.addAddressInfo(new AddressInfo(address)
.addRoutingType(RoutingType.MULTICAST));
server.createQueue(new QueueConfiguration(queueName)
.setAddress(address)
.setRoutingType(RoutingType.MULTICAST)
.setConfigurationManaged(true));
}
MQTTClientProvider clientProvider = getMQTTClientProvider();
clientProvider.setClientId(clientId);
initializeConnection(clientProvider);
clientProvider.subscribe(address, AT_LEAST_ONCE);
clientProvider.disconnect();
if (managed) {
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueName)) != null &&
server.locateQueue(SimpleString.toSimpleString(queueName)).getConsumerCount() == 0, 5000, 10));
} else {
assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueName)) == null, 5000, 10));
}
}
@Test
public void testQueueCleanWhenConnectionSynExeConnectAndDisconnect() throws Exception {
Random random = new Random();