From b5e25eb4fe5b49b0562c9b94fbe290c9fd48926b Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Mon, 14 Nov 2022 11:00:34 -0600 Subject: [PATCH] ARTEMIS-3871 uniquely name MQTT share sub queues --- .../mqtt/MQTTSubscriptionManager.java | 57 +++++----- docs/user-manual/en/versions.md | 17 ++- .../tests/integration/mqtt/MQTTTest.java | 2 +- .../tests/integration/mqtt5/MQTT5Test.java | 100 ++++++++++++++++++ .../integration/mqtt5/MQTT5TestSupport.java | 10 +- .../mqtt5/spec/SubscriptionTests.java | 8 +- 6 files changed, 157 insertions(+), 37 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 83757b7c16..4b0ea51ab1 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -103,37 +103,35 @@ public class MQTTSubscriptionManager { } private void addSubscription(MqttTopicSubscription subscription, Integer subscriptionIdentifier, boolean initialStart) throws Exception { - String topicName = CompositeAddress.extractAddressName(subscription.topicName()); - String sharedSubscriptionName = null; + String rawTopicName = CompositeAddress.extractAddressName(subscription.topicName()); + String parsedTopicName = rawTopicName; - // if using a shared subscription then parse the subscription name and topic - if (topicName.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) { - int slashIndex = topicName.indexOf(SLASH) + 1; - sharedSubscriptionName = topicName.substring(slashIndex, topicName.indexOf(SLASH, slashIndex)); - topicName = topicName.substring(topicName.indexOf(SLASH, slashIndex) + 1); + // if using a shared subscription then parse + if (rawTopicName.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) { + parsedTopicName = rawTopicName.substring(rawTopicName.indexOf(SLASH, rawTopicName.indexOf(SLASH) + 1) + 1); } int qos = subscription.qualityOfService().value(); - String coreAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(topicName, session.getWildcardConfiguration()); + String coreAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(parsedTopicName, session.getWildcardConfiguration()); - Queue q = createQueueForSubscription(coreAddress, sharedSubscriptionName); + Queue q = createQueueForSubscription(coreAddress, getQueueNameForTopic(rawTopicName)); if (initialStart) { - createConsumerForSubscriptionQueue(q, topicName, qos, subscription.option().isNoLocal(), null); + createConsumerForSubscriptionQueue(q, parsedTopicName, qos, subscription.option().isNoLocal(), null); } else { - MqttTopicSubscription existingSubscription = session.getState().getSubscription(topicName); + MqttTopicSubscription existingSubscription = session.getState().getSubscription(parsedTopicName); if (existingSubscription == null) { - createConsumerForSubscriptionQueue(q, topicName, qos, subscription.option().isNoLocal(), null); + createConsumerForSubscriptionQueue(q, parsedTopicName, qos, subscription.option().isNoLocal(), null); } else { - Long existingConsumerId = consumers.get(topicName).getID(); + Long existingConsumerId = consumers.get(parsedTopicName).getID(); consumerQoSLevels.put(existingConsumerId, qos); if (existingSubscription.option().isNoLocal() != subscription.option().isNoLocal()) { - createConsumerForSubscriptionQueue(q, topicName, qos, subscription.option().isNoLocal(), existingConsumerId); + createConsumerForSubscriptionQueue(q, parsedTopicName, qos, subscription.option().isNoLocal(), existingConsumerId); } } if (subscription.option().retainHandling() == MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE || (subscription.option().retainHandling() == MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS && existingSubscription == null)) { - session.getRetainMessageManager().addRetainedMessagesToQueue(q, topicName); + session.getRetainMessageManager().addRetainedMessagesToQueue(q, parsedTopicName); } session.getState().addSubscription(subscription, session.getWildcardConfiguration(), subscriptionIdentifier); @@ -149,17 +147,9 @@ public class MQTTSubscriptionManager { } } - private Queue createQueueForSubscription(String address, String sharedSubscriptionName) throws Exception { - // determine the proper queue name - SimpleString queue; - if (sharedSubscriptionName != null) { - queue = SimpleString.toSimpleString(sharedSubscriptionName); - } else { - queue = getQueueNameForTopic(address); - } - + private Queue createQueueForSubscription(String address, SimpleString queueName) throws Exception { // check to see if a subscription queue already exists. - Queue q = session.getServer().locateQueue(queue); + Queue q = session.getServer().locateQueue(queueName); // The queue does not exist so we need to create it. if (q == null) { @@ -180,7 +170,7 @@ public class MQTTSubscriptionManager { addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address), RoutingType.MULTICAST, true); } - return findOrCreateQueue(bindingQueryResult, addressInfo, queue); + return findOrCreateQueue(bindingQueryResult, addressInfo, queueName); } return q; } @@ -269,13 +259,11 @@ public class MQTTSubscriptionManager { short reasonCode = MQTTReasonCodes.SUCCESS; try { - String internalAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(address, session.getWildcardConfiguration()); - SimpleString internalQueueName = getQueueNameForTopic(internalAddress); + SimpleString internalQueueName = getQueueNameForTopic(address); session.getState().removeSubscription(address); Queue queue = session.getServer().locateQueue(internalQueueName); - SimpleString sAddress = SimpleString.toSimpleString(internalAddress); - AddressInfo addressInfo = session.getServerSession().getAddress(sAddress); + AddressInfo addressInfo = session.getServerSession().getAddress(SimpleString.toSimpleString(MQTTUtil.convertMqttTopicFilterToCoreAddress(address, session.getWildcardConfiguration()))); if (addressInfo != null && addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) { ServerConsumer consumer = consumers.get(address); consumers.remove(address); @@ -314,7 +302,14 @@ public class MQTTSubscriptionManager { } private SimpleString getQueueNameForTopic(String topic) { - return new SimpleString(session.getState().getClientId() + "." + topic); + if (topic.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) { + int slashIndex = topic.indexOf(SLASH) + 1; + String sharedSubscriptionName = topic.substring(slashIndex, topic.indexOf(SLASH, slashIndex)); + String parsedTopicName = topic.substring(topic.indexOf(SLASH, slashIndex) + 1); + return new SimpleString(sharedSubscriptionName).concat(".").concat(session.getState().getClientId()).concat(".").concat(parsedTopicName); + } else { + return new SimpleString(session.getState().getClientId()).concat(".").concat(topic); + } } /** diff --git a/docs/user-manual/en/versions.md b/docs/user-manual/en/versions.md index 3454c97317..538042471c 100644 --- a/docs/user-manual/en/versions.md +++ b/docs/user-manual/en/versions.md @@ -8,6 +8,21 @@ This chapter provides the following information for each release: - **Note:** Follow the general upgrade procedure outlined in the [Upgrading the Broker](upgrading.md) chapter in addition to any version-specific upgrade instructions outlined here. +## 2.28.0 +[Full release notes]() + +Highlights: +- ... + +#### Upgrading from older versions +1. Due to [ARTEMIS-3871](https://issues.apache.org/jira/browse/ARTEMIS-3871) the naming pattern used for MQTT _shared_ + subscription queues has changed. Previously the subscription queue was named according to the subscription name + provided in the MQTT `SUBSCRIBE` packet. However, MQTT allows the same name to be used across multiple subscriptions + whereas queues in the broker must be named uniquely. Now the subscription queue will be named according to the + subscription name, client ID, and topic name so that all subscription queue names will be unique. Before upgrading + please ensure all MQTT shared subscriptions are empty. When the subscribers reconnect they will get a new + subscription queue. If they are not empty you can move the messages to the new subscription queue administratively. + ## 2.27.1 [Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12352610&projectId=12315920) @@ -17,7 +32,7 @@ Highlights: - AMQP Large Message over Bridges were broken - Rollback of massive transactions would take a long time to process - Improvements to auto-create and auto-delete queues. - + ## 2.27.0 [Full release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12352246&projectId=12315920) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java index adc39a276c..a68d0f66d6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java @@ -1842,7 +1842,7 @@ public class MQTTTest extends MQTTTestSupport { Exception peerDisconnectedException = null; try { String clientId = "test.client"; - SimpleString coreAddress = new SimpleString("foo.bar"); + SimpleString coreAddress = new SimpleString("foo/bar"); Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; getServer().createQueue(new QueueConfiguration(new SimpleString(clientId + "." + coreAddress)).setAddress(coreAddress).setRoutingType(RoutingType.MULTICAST).setDurable(false).setTemporary(true).setMaxConsumers(0)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java index 67deeba240..d88f4a6981 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.tests.util.RandomUtil; @@ -242,4 +243,103 @@ public class MQTT5Test extends MQTT5TestSupport { AssertionLoggerHandler.stopCapture(); } } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testSharedSubscriptionsWithSameName() throws Exception { + final String TOPIC1 = "myTopic1"; + final String TOPIC2 = "myTopic2"; + final String SUB_NAME = "mySub"; + final String SHARED_SUB1 = MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC1; + final String SHARED_SUB2 = MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC2; + CountDownLatch ackLatch1 = new CountDownLatch(1); + CountDownLatch ackLatch2 = new CountDownLatch(1); + + MqttClient consumer1 = createPahoClient("consumer1"); + consumer1.connect(); + consumer1.setCallback(new LatchedMqttCallback(ackLatch1)); + consumer1.subscribe(SHARED_SUB1, 1); + + assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1))); + Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME); + assertNotNull(q1); + assertEquals(TOPIC1, q1.getAddress().toString()); + assertEquals(1, q1.getConsumerCount()); + + MqttClient consumer2 = createPahoClient("consumer2"); + consumer2.connect(); + consumer2.setCallback(new LatchedMqttCallback(ackLatch2)); + consumer2.subscribe(SHARED_SUB2, 1); + + assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2))); + Queue q2 = getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME); + assertNotNull(q2); + assertEquals(TOPIC2, q2.getAddress().toString()); + assertEquals(1, q2.getConsumerCount()); + + MqttClient producer = createPahoClient("producer"); + producer.connect(); + producer.publish(TOPIC1, new byte[0], 1, false); + producer.publish(TOPIC2, new byte[0], 1, false); + producer.disconnect(); + producer.close(); + + assertTrue(ackLatch1.await(2, TimeUnit.SECONDS)); + assertTrue(ackLatch2.await(2, TimeUnit.SECONDS)); + + consumer1.unsubscribe(SHARED_SUB1); + assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME)); + + consumer2.unsubscribe(SHARED_SUB2); + assertNull(getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME)); + + consumer1.disconnect(); + consumer1.close(); + consumer2.disconnect(); + consumer2.close(); + } + + @Test(timeout = DEFAULT_TIMEOUT) + public void testSharedSubscriptionsWithSameName2() throws Exception { + final String TOPIC1 = "myTopic1"; + final String TOPIC2 = "myTopic2"; + final String SUB_NAME = "mySub"; + final String[] SHARED_SUBS = new String[]{ + MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC1, + MQTTUtil.SHARED_SUBSCRIPTION_PREFIX + SUB_NAME + "/" + TOPIC2 + }; + CountDownLatch ackLatch = new CountDownLatch(2); + + MqttClient consumer = createPahoClient("consumer1"); + consumer.connect(); + consumer.setCallback(new LatchedMqttCallback(ackLatch)); + consumer.subscribe(SHARED_SUBS, new int[]{1, 1}); + + assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1))); + Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME); + assertNotNull(q1); + assertEquals(TOPIC1, q1.getAddress().toString()); + assertEquals(1, q1.getConsumerCount()); + + assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2))); + Queue q2 = getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME); + assertNotNull(q2); + assertEquals(TOPIC2, q2.getAddress().toString()); + assertEquals(1, q2.getConsumerCount()); + + MqttClient producer = createPahoClient("producer"); + producer.connect(); + producer.publish(TOPIC1, new byte[0], 1, false); + producer.publish(TOPIC2, new byte[0], 1, false); + producer.disconnect(); + producer.close(); + + assertTrue(ackLatch.await(2, TimeUnit.SECONDS)); + + consumer.unsubscribe(SHARED_SUBS); + assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME)); + assertNull(getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME)); + + consumer.disconnect(); + consumer.close(); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java index 1f4349271d..6891d0311e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java @@ -346,9 +346,17 @@ public class MQTT5TestSupport extends ActiveMQTestBase { } protected Queue getSubscriptionQueue(String TOPIC, String clientId) { + return getSubscriptionQueue(TOPIC, clientId, null); + } + + protected Queue getSubscriptionQueue(String TOPIC, String clientId, String sharedSubscriptionName) { try { for (Binding b : server.getPostOffice().getMatchingBindings(SimpleString.toSimpleString(TOPIC))) { - if (((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(clientId))) { + if (sharedSubscriptionName != null) { + if (((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(sharedSubscriptionName))) { + return ((LocalQueueBinding)b).getQueue(); + } + } else if (((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(clientId))) { return ((LocalQueueBinding)b).getQueue(); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java index 0619589843..8e61c17969 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; +import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport; import org.apache.activemq.artemis.utils.Wait; import org.eclipse.paho.mqttv5.client.MqttClient; @@ -126,9 +127,10 @@ public class SubscriptionTests extends MQTT5TestSupport { consumer1.setCallback(new LatchedMqttCallback(ackLatch)); consumer1.subscribe(SHARED_SUB, 1); - assertNotNull(server.locateQueue(SUB_NAME)); - assertEquals(TOPIC, server.locateQueue(SUB_NAME).getAddress().toString()); - assertEquals(1, server.locateQueue(SUB_NAME).getConsumerCount()); + Queue sharedSubQueue = server.locateQueue(SUB_NAME.concat(".").concat(consumer1.getClientId()).concat(".").concat(TOPIC)); + assertNotNull(sharedSubQueue); + assertEquals(TOPIC, sharedSubQueue.getAddress().toString()); + assertEquals(1, sharedSubQueue.getConsumerCount()); MqttClient producer = createPahoClient("producer"); producer.connect();