ARTEMIS-4249 failure to create internal MQTT consumer can orphan sub q
This commit is contained in:
parent
9bac93e25e
commit
fde9d223ae
|
@ -108,6 +108,7 @@ public class MQTTSubscriptionManager {
|
|||
|
||||
Queue q = createQueueForSubscription(coreAddress, getQueueNameForTopic(rawTopicName));
|
||||
|
||||
try {
|
||||
if (initialStart) {
|
||||
createConsumerForSubscriptionQueue(q, parsedTopicName, qos, subscription.option().isNoLocal(), null);
|
||||
} else {
|
||||
|
@ -122,13 +123,17 @@ public class MQTTSubscriptionManager {
|
|||
}
|
||||
}
|
||||
|
||||
if (subscription.option().retainHandling() == MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE ||
|
||||
(subscription.option().retainHandling() == MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS && existingSubscription == null)) {
|
||||
if (subscription.option().retainHandling() == MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE || (subscription.option().retainHandling() == MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS && existingSubscription == null)) {
|
||||
session.getRetainMessageManager().addRetainedMessagesToQueue(q, parsedTopicName);
|
||||
}
|
||||
|
||||
session.getState().addSubscription(subscription, session.getWildcardConfiguration(), subscriptionIdentifier);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// if anything broke during the creation of the consumer (or otherwise) then ensure the subscription queue is removed
|
||||
q.deleteQueue();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private String parseTopicName(String rawTopicName) {
|
||||
|
|
|
@ -472,4 +472,28 @@ public class MQTT5Test extends MQTT5TestSupport {
|
|||
client.disconnect();
|
||||
client.close();
|
||||
}
|
||||
|
||||
@Test(timeout = DEFAULT_TIMEOUT)
|
||||
public void testQueueCleanedUpOnConsumerFail() throws Exception {
|
||||
final String topic = getName();
|
||||
final String clientID = getName();
|
||||
|
||||
// force the creation of the consumer to fail
|
||||
server.getAddressSettingsRepository().addMatch(topic, new AddressSettings().setDefaultMaxConsumers(0));
|
||||
|
||||
MqttClient client = createPahoClient(clientID);
|
||||
client.connect();
|
||||
try {
|
||||
client.subscribe(topic, 1);
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
Wait.assertTrue(() -> getSubscriptionQueue(topic, clientID) == null, 2000, 100);
|
||||
|
||||
if (client.isConnected()) {
|
||||
client.disconnect();
|
||||
}
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue