ARTEMIS-4109 unable to auto-delete q for MQTT retained msg

This commit is contained in:
Justin Bertram 2022-12-05 16:11:16 -06:00
parent df81bfa567
commit 038e95adb9
3 changed files with 95 additions and 5 deletions

View File

@ -50,7 +50,7 @@ public class MQTTRetainMessageManager {
Queue queue = session.getServer().locateQueue(retainAddress);
if (queue == null) {
queue = session.getServer().createQueue(new QueueConfiguration(retainAddress));
queue = session.getServer().createQueue(new QueueConfiguration(retainAddress).setAutoCreated(true));
}
queue.deleteAllReferences();

View File

@ -67,14 +67,32 @@ sent or received:
## MQTT Retain Messages
MQTT has an interesting feature in which messages can be "retained" for a
particular address. This means that once a retain message has been sent to an
address, any new subscribers to that address will receive the last sent retain
message before any others messages, this happens even if the retained message
particular address. This means that once a retain message has been sent to an
address, any new subscribers to that address will receive the last sent retained
message before any others messages. This happens even if the retained message
was sent before a client has connected or subscribed. An example of where this
feature might be useful is in environments such as IoT where devices need to
quickly get the current state of a system when they are on boarded into a
system.
Retained messages are stored in a queue named with a special prefix according to
the name of the topic where they were originally sent. For example, a retained
message sent to the topic `/abc/123` will be stored in a multicast queue named
`$sys.mqtt.retain.abc.123` with an address of the same name. The MQTT
specification doesn't define how long retained messages should be stored so the
broker will hold on to this data until a client explicitly deletes the retained
message or it potentially expires. However, even at that point the queue and
address for the retained message will remain. These resources can be
automatically deleted via the following `address-setting`:
```xml
<address-setting match="$sys.mqtt.retain.#">
<auto-delete-queues>true</auto-delete-queues>
<auto-delete-addresses>true</auto-delete-addresses>
</address-setting>
```
Keep in mind that it's also possible to automatically apply an [`expiry-delay`](message-expiry.md)
to retained messages as well.
## Will Messages
A will message can be sent when a client initially connects to a broker.

View File

@ -88,9 +88,9 @@ public class MQTTTest extends MQTTTestSupport {
public void configureBroker() throws Exception {
super.configureBroker();
server.getConfiguration().setAddressQueueScanPeriod(100);
server.getConfiguration().setMessageExpiryScanPeriod(100);
}
@Test
public void testConnectWithLargePassword() throws Exception {
for (String version : Arrays.asList("3.1", "3.1.1")) {
@ -2086,4 +2086,76 @@ public class MQTTTest extends MQTTTestSupport {
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString("foo.bar")) == null);
}
@Test(timeout = 60 * 1000)
public void testAutoDeleteRetainedQueue() throws Exception {
final String TOPIC = "/abc/123";
final String RETAINED_QUEUE = MQTTUtil.convertMqttTopicFilterToCoreAddress(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, server.getConfiguration().getWildcardConfiguration());
final MQTTClientProvider publisher = getMQTTClientProvider();
final MQTTClientProvider subscriber = getMQTTClientProvider();
server.getAddressSettingsRepository().addMatch(MQTTUtil.convertMqttTopicFilterToCoreAddress("#", server.getConfiguration().getWildcardConfiguration()), new AddressSettings().setExpiryDelay(500L).setAutoDeleteQueues(true).setAutoDeleteAddresses(true));
initializeConnection(publisher);
initializeConnection(subscriber);
String RETAINED = "retained";
publisher.publish(TOPIC, RETAINED.getBytes(), AT_LEAST_ONCE, true);
List<String> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
messages.add("TEST MESSAGE:" + i);
}
subscriber.subscribe(TOPIC, AT_LEAST_ONCE);
for (int i = 0; i < 10; i++) {
publisher.publish(TOPIC, messages.get(i).getBytes(), AT_LEAST_ONCE);
}
byte[] msg = subscriber.receive(5000);
assertNotNull(msg);
assertEquals(RETAINED, new String(msg));
for (int i = 0; i < 10; i++) {
msg = subscriber.receive(5000);
assertNotNull(msg);
assertEquals(messages.get(i), new String(msg));
}
subscriber.disconnect();
publisher.disconnect();
Wait.assertTrue(() -> server.locateQueue(RETAINED_QUEUE).getMessageCount() == 0, 2000, 50);
Wait.assertTrue(() -> server.locateQueue(RETAINED_QUEUE) == null, 2000, 50);
// now that we auto-deleted do it again to ensure it is recreated and auto-deleted properly
initializeConnection(publisher);
initializeConnection(subscriber);
publisher.publish(TOPIC, RETAINED.getBytes(), AT_LEAST_ONCE, true);
subscriber.subscribe(TOPIC, AT_LEAST_ONCE);
for (int i = 0; i < 10; i++) {
publisher.publish(TOPIC, messages.get(i).getBytes(), AT_LEAST_ONCE);
}
msg = subscriber.receive(5000);
assertNotNull(msg);
assertEquals(RETAINED, new String(msg));
for (int i = 0; i < 10; i++) {
msg = subscriber.receive(5000);
assertNotNull(msg);
assertEquals(messages.get(i), new String(msg));
}
subscriber.disconnect();
publisher.disconnect();
Wait.assertTrue(() -> server.locateQueue(RETAINED_QUEUE).getMessageCount() == 0, 3000, 50);
Wait.assertTrue(() -> server.locateQueue(RETAINED_QUEUE) == null, 3000, 50);
}
}