diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index d894910fc3..1187db0325 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -25,8 +25,10 @@ import java.util.concurrent.ConcurrentMap; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import org.apache.activemq.artemis.api.core.FilterConstants; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; public class MQTTSubscriptionManager { @@ -61,7 +63,8 @@ public class MQTTSubscriptionManager { synchronized void start() throws Exception { for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) { - Queue q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value()); + String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName()); + Queue q = createQueueForSubscription(coreAddress, subscription.qualityOfService().value()); createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value()); } } @@ -84,13 +87,17 @@ public class MQTTSubscriptionManager { /** * Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name. */ - private Queue createQueueForSubscription(String topic, int qos) throws Exception { - String address = MQTTUtil.convertMQTTAddressFilterToCore(topic); + private Queue createQueueForSubscription(String address, int qos) throws Exception { + SimpleString queue = getQueueNameForTopic(address); Queue q = session.getServer().locateQueue(queue); if (q == null) { - q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0); + q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, -1, false); + } else { + if (q.isDeleteOnNoConsumers()) { + throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true); + } } return q; } @@ -113,9 +120,15 @@ public class MQTTSubscriptionManager { int qos = subscription.qualityOfService().value(); String topic = subscription.topicName(); + String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic); + AddressInfo addressInfo = session.getServer().getAddressInfo(new SimpleString(coreAddress)); + if (addressInfo != null && addressInfo.getRoutingType() != AddressInfo.RoutingType.MULTICAST) { + throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress), AddressInfo.RoutingType.MULTICAST, addressInfo.getRoutingType()); + } + session.getSessionState().addSubscription(subscription); - Queue q = createQueueForSubscription(topic, qos); + Queue q = createQueueForSubscription(coreAddress, qos); if (s == null) { createConsumerForSubscriptionQueue(q, topic, qos); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index b809df0c49..e99fc96ab3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -22,6 +22,7 @@ import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; +import java.io.EOFException; import java.lang.reflect.Field; import java.net.ProtocolException; import java.util.ArrayList; @@ -34,8 +35,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; @@ -1612,4 +1615,79 @@ public class MQTTTest extends MQTTTestSupport { connection.disconnect(); } + + @Test(timeout = 60 * 1000) + public void testClientDisconnectedOnMaxConsumerLimitReached() throws Exception { + Exception peerDisconnectedException = null; + try { + String clientId = "test.client"; + SimpleString coreAddress = new SimpleString("foo.bar"); + Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; + + AddressInfo addressInfo = new AddressInfo(coreAddress); + addressInfo.setDefaultMaxConsumers(0); + getServer().createOrUpdateAddressInfo(addressInfo); + + getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false); + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); + mqtt.setKeepAlive((short) 2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.subscribe(mqttSubscription); + } catch (EOFException e) { + peerDisconnectedException = e; + } + assertNotNull(peerDisconnectedException); + assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected")); + } + + @Test(timeout = 60 * 1000) + public void testClientDisconnectedWhenTryingToSubscribeToAnAnycastAddress() throws Exception { + Exception peerDisconnectedException = null; + try { + String clientId = "test.mqtt"; + SimpleString coreAddress = new SimpleString("foo.bar"); + Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; + + AddressInfo addressInfo = new AddressInfo(coreAddress); + addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST); + getServer().createOrUpdateAddressInfo(addressInfo); + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); + mqtt.setKeepAlive((short) 2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.subscribe(mqttSubscription); + } catch (EOFException e) { + peerDisconnectedException = e; + } + assertNotNull(peerDisconnectedException); + assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected")); + } + + @Test(timeout = 60 * 1000) + public void testClientDisconnectedWhenTryingToSubscribeToAnExistingQueueWithDeleteOnNoConsumers() throws Exception { + Exception peerDisconnectedException = null; + try { + String clientId = "testMqtt"; + SimpleString coreAddress = new SimpleString("foo.bar"); + getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, -1, true); + + Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); + mqtt.setKeepAlive((short) 2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.subscribe(mqttSubscription); + } catch (EOFException e) { + peerDisconnectedException = e; + } + assertNotNull(peerDisconnectedException); + assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected")); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index 27ebde06c8..15cb8b6b7c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -92,6 +92,10 @@ public class MQTTTestSupport extends ActiveMQTestBase { return name.getMethodName(); } + public ActiveMQServer getServer() { + return server; + } + @Override @Before public void setUp() throws Exception {