ARTEMIS-3871 uniquely name MQTT share sub queues

This commit is contained in:
Justin Bertram 2022-11-14 11:00:34 -06:00 committed by clebertsuconic
parent 4af11a04d3
commit b5e25eb4fe
6 changed files with 157 additions and 37 deletions
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt
docs/user-manual/en
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration

View File

@ -103,37 +103,35 @@ public class MQTTSubscriptionManager {
}
private void addSubscription(MqttTopicSubscription subscription, Integer subscriptionIdentifier, boolean initialStart) throws Exception {
String topicName = CompositeAddress.extractAddressName(subscription.topicName());
String sharedSubscriptionName = null;
String rawTopicName = CompositeAddress.extractAddressName(subscription.topicName());
String parsedTopicName = rawTopicName;
// if using a shared subscription then parse the subscription name and topic
if (topicName.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
int slashIndex = topicName.indexOf(SLASH) + 1;
sharedSubscriptionName = topicName.substring(slashIndex, topicName.indexOf(SLASH, slashIndex));
topicName = topicName.substring(topicName.indexOf(SLASH, slashIndex) + 1);
// if using a shared subscription then parse
if (rawTopicName.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
parsedTopicName = rawTopicName.substring(rawTopicName.indexOf(SLASH, rawTopicName.indexOf(SLASH) + 1) + 1);
}
int qos = subscription.qualityOfService().value();
String coreAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(topicName, session.getWildcardConfiguration());
String coreAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(parsedTopicName, session.getWildcardConfiguration());
Queue q = createQueueForSubscription(coreAddress, sharedSubscriptionName);
Queue q = createQueueForSubscription(coreAddress, getQueueNameForTopic(rawTopicName));
if (initialStart) {
createConsumerForSubscriptionQueue(q, topicName, qos, subscription.option().isNoLocal(), null);
createConsumerForSubscriptionQueue(q, parsedTopicName, qos, subscription.option().isNoLocal(), null);
} else {
MqttTopicSubscription existingSubscription = session.getState().getSubscription(topicName);
MqttTopicSubscription existingSubscription = session.getState().getSubscription(parsedTopicName);
if (existingSubscription == null) {
createConsumerForSubscriptionQueue(q, topicName, qos, subscription.option().isNoLocal(), null);
createConsumerForSubscriptionQueue(q, parsedTopicName, qos, subscription.option().isNoLocal(), null);
} else {
Long existingConsumerId = consumers.get(topicName).getID();
Long existingConsumerId = consumers.get(parsedTopicName).getID();
consumerQoSLevels.put(existingConsumerId, qos);
if (existingSubscription.option().isNoLocal() != subscription.option().isNoLocal()) {
createConsumerForSubscriptionQueue(q, topicName, qos, subscription.option().isNoLocal(), existingConsumerId);
createConsumerForSubscriptionQueue(q, parsedTopicName, qos, subscription.option().isNoLocal(), existingConsumerId);
}
}
if (subscription.option().retainHandling() == MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE ||
(subscription.option().retainHandling() == MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS && existingSubscription == null)) {
session.getRetainMessageManager().addRetainedMessagesToQueue(q, topicName);
session.getRetainMessageManager().addRetainedMessagesToQueue(q, parsedTopicName);
}
session.getState().addSubscription(subscription, session.getWildcardConfiguration(), subscriptionIdentifier);
@ -149,17 +147,9 @@ public class MQTTSubscriptionManager {
}
}
private Queue createQueueForSubscription(String address, String sharedSubscriptionName) throws Exception {
// determine the proper queue name
SimpleString queue;
if (sharedSubscriptionName != null) {
queue = SimpleString.toSimpleString(sharedSubscriptionName);
} else {
queue = getQueueNameForTopic(address);
}
private Queue createQueueForSubscription(String address, SimpleString queueName) throws Exception {
// check to see if a subscription queue already exists.
Queue q = session.getServer().locateQueue(queue);
Queue q = session.getServer().locateQueue(queueName);
// The queue does not exist so we need to create it.
if (q == null) {
@ -180,7 +170,7 @@ public class MQTTSubscriptionManager {
addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address),
RoutingType.MULTICAST, true);
}
return findOrCreateQueue(bindingQueryResult, addressInfo, queue);
return findOrCreateQueue(bindingQueryResult, addressInfo, queueName);
}
return q;
}
@ -269,13 +259,11 @@ public class MQTTSubscriptionManager {
short reasonCode = MQTTReasonCodes.SUCCESS;
try {
String internalAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(address, session.getWildcardConfiguration());
SimpleString internalQueueName = getQueueNameForTopic(internalAddress);
SimpleString internalQueueName = getQueueNameForTopic(address);
session.getState().removeSubscription(address);
Queue queue = session.getServer().locateQueue(internalQueueName);
SimpleString sAddress = SimpleString.toSimpleString(internalAddress);
AddressInfo addressInfo = session.getServerSession().getAddress(sAddress);
AddressInfo addressInfo = session.getServerSession().getAddress(SimpleString.toSimpleString(MQTTUtil.convertMqttTopicFilterToCoreAddress(address, session.getWildcardConfiguration())));
if (addressInfo != null && addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
ServerConsumer consumer = consumers.get(address);
consumers.remove(address);
@ -314,7 +302,14 @@ public class MQTTSubscriptionManager {
}
private SimpleString getQueueNameForTopic(String topic) {
return new SimpleString(session.getState().getClientId() + "." + topic);
if (topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
int slashIndex = topic.indexOf(SLASH) + 1;
String sharedSubscriptionName = topic.substring(slashIndex, topic.indexOf(SLASH, slashIndex));
String parsedTopicName = topic.substring(topic.indexOf(SLASH, slashIndex) + 1);
return new SimpleString(sharedSubscriptionName).concat(".").concat(session.getState().getClientId()).concat(".").concat(parsedTopicName);
} else {
return new SimpleString(session.getState().getClientId()).concat(".").concat(topic);
}
}
/**

View File

@ -8,6 +8,21 @@ This chapter provides the following information for each release:
- **Note:** Follow the general upgrade procedure outlined in the [Upgrading the Broker](upgrading.md)
chapter in addition to any version-specific upgrade instructions outlined here.
## 2.28.0
[Full release notes]()
Highlights:
- ...
#### Upgrading from older versions
1. Due to [ARTEMIS-3871](https://issues.apache.org/jira/browse/ARTEMIS-3871) the naming pattern used for MQTT _shared_
subscription queues has changed. Previously the subscription queue was named according to the subscription name
provided in the MQTT `SUBSCRIBE` packet. However, MQTT allows the same name to be used across multiple subscriptions
whereas queues in the broker must be named uniquely. Now the subscription queue will be named according to the
subscription name, client ID, and topic name so that all subscription queue names will be unique. Before upgrading
please ensure all MQTT shared subscriptions are empty. When the subscribers reconnect they will get a new
subscription queue. If they are not empty you can move the messages to the new subscription queue administratively.
## 2.27.1
[Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12352610&projectId=12315920)
@ -17,7 +32,7 @@ Highlights:
- AMQP Large Message over Bridges were broken
- Rollback of massive transactions would take a long time to process
- Improvements to auto-create and auto-delete queues.
## 2.27.0
[Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12352246&projectId=12315920)

View File

@ -1842,7 +1842,7 @@ public class MQTTTest extends MQTTTestSupport {
Exception peerDisconnectedException = null;
try {
String clientId = "test.client";
SimpleString coreAddress = new SimpleString("foo.bar");
SimpleString coreAddress = new SimpleString("foo/bar");
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
getServer().createQueue(new QueueConfiguration(new SimpleString(clientId + "." + coreAddress)).setAddress(coreAddress).setRoutingType(RoutingType.MULTICAST).setDurable(false).setTemporary(true).setMaxConsumers(0));

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.RandomUtil;
@ -242,4 +243,103 @@ public class MQTT5Test extends MQTT5TestSupport {
AssertionLoggerHandler.stopCapture();
}
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testSharedSubscriptionsWithSameName() throws Exception {
final String TOPIC1 = "myTopic1";
final String TOPIC2 = "myTopic2";
final String SUB_NAME = "mySub";
final String SHARED_SUB1 = MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC1;
final String SHARED_SUB2 = MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC2;
CountDownLatch ackLatch1 = new CountDownLatch(1);
CountDownLatch ackLatch2 = new CountDownLatch(1);
MqttClient consumer1 = createPahoClient("consumer1");
consumer1.connect();
consumer1.setCallback(new LatchedMqttCallback(ackLatch1));
consumer1.subscribe(SHARED_SUB1, 1);
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1)));
Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME);
assertNotNull(q1);
assertEquals(TOPIC1, q1.getAddress().toString());
assertEquals(1, q1.getConsumerCount());
MqttClient consumer2 = createPahoClient("consumer2");
consumer2.connect();
consumer2.setCallback(new LatchedMqttCallback(ackLatch2));
consumer2.subscribe(SHARED_SUB2, 1);
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2)));
Queue q2 = getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME);
assertNotNull(q2);
assertEquals(TOPIC2, q2.getAddress().toString());
assertEquals(1, q2.getConsumerCount());
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC1, new byte[0], 1, false);
producer.publish(TOPIC2, new byte[0], 1, false);
producer.disconnect();
producer.close();
assertTrue(ackLatch1.await(2, TimeUnit.SECONDS));
assertTrue(ackLatch2.await(2, TimeUnit.SECONDS));
consumer1.unsubscribe(SHARED_SUB1);
assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME));
consumer2.unsubscribe(SHARED_SUB2);
assertNull(getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME));
consumer1.disconnect();
consumer1.close();
consumer2.disconnect();
consumer2.close();
}
@Test(timeout = DEFAULT_TIMEOUT)
public void testSharedSubscriptionsWithSameName2() throws Exception {
final String TOPIC1 = "myTopic1";
final String TOPIC2 = "myTopic2";
final String SUB_NAME = "mySub";
final String[] SHARED_SUBS = new String[]{
MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC1,
MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC2
};
CountDownLatch ackLatch = new CountDownLatch(2);
MqttClient consumer = createPahoClient("consumer1");
consumer.connect();
consumer.setCallback(new LatchedMqttCallback(ackLatch));
consumer.subscribe(SHARED_SUBS, new int[]{1, 1});
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1)));
Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME);
assertNotNull(q1);
assertEquals(TOPIC1, q1.getAddress().toString());
assertEquals(1, q1.getConsumerCount());
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2)));
Queue q2 = getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME);
assertNotNull(q2);
assertEquals(TOPIC2, q2.getAddress().toString());
assertEquals(1, q2.getConsumerCount());
MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC1, new byte[0], 1, false);
producer.publish(TOPIC2, new byte[0], 1, false);
producer.disconnect();
producer.close();
assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
consumer.unsubscribe(SHARED_SUBS);
assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME));
assertNull(getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME));
consumer.disconnect();
consumer.close();
}
}

View File

@ -346,9 +346,17 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
}
protected Queue getSubscriptionQueue(String TOPIC, String clientId) {
return getSubscriptionQueue(TOPIC, clientId, null);
}
protected Queue getSubscriptionQueue(String TOPIC, String clientId, String sharedSubscriptionName) {
try {
for (Binding b : server.getPostOffice().getMatchingBindings(SimpleString.toSimpleString(TOPIC))) {
if (((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(clientId))) {
if (sharedSubscriptionName != null) {
if (((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(sharedSubscriptionName))) {
return ((LocalQueueBinding)b).getQueue();
}
} else if (((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(clientId))) {
return ((LocalQueueBinding)b).getQueue();
}
}

View File

@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.MqttClient;
@ -126,9 +127,10 @@ public class SubscriptionTests extends MQTT5TestSupport {
consumer1.setCallback(new LatchedMqttCallback(ackLatch));
consumer1.subscribe(SHARED_SUB, 1);
assertNotNull(server.locateQueue(SUB_NAME));
assertEquals(TOPIC, server.locateQueue(SUB_NAME).getAddress().toString());
assertEquals(1, server.locateQueue(SUB_NAME).getConsumerCount());
Queue sharedSubQueue = server.locateQueue(SUB_NAME.concat(".").concat(consumer1.getClientId()).concat(".").concat(TOPIC));
assertNotNull(sharedSubQueue);
assertEquals(TOPIC, sharedSubQueue.getAddress().toString());
assertEquals(1, sharedSubQueue.getConsumerCount());
MqttClient producer = createPahoClient("producer");
producer.connect();