diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index c4b8b94c80..f09e5c53a0 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -17,17 +17,21 @@ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.FilterConstants; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; -import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -89,20 +93,65 @@ public class MQTTSubscriptionManager { * Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name. */ private Queue createQueueForSubscription(String address, int qos) throws Exception { - + // Check to see if a subscription queue already exists. SimpleString queue = getQueueNameForTopic(address); - Queue q = session.getServer().locateQueue(queue); + + // The queue does not exist so we need to create it. if (q == null) { - q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false); - } else { - if (q.isDeleteOnNoConsumers()) { - throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true); + SimpleString sAddress = SimpleString.toSimpleString(address); + + // Check we can auto create queues. + BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(sAddress); + if (!bindingQueryResult.isAutoCreateQueues()) { + throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(sAddress); } + + // Check that the address exists, if not we try to auto create it. + AddressInfo addressInfo = session.getServerSession().getAddress(sAddress); + if (addressInfo == null) { + if (!bindingQueryResult.isAutoCreateAddresses()) { + throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address)); + } + addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address), RoutingType.MULTICAST, false); + } + return findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos); } return q; } + private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, AddressInfo addressInfo, SimpleString queue, int qos) throws Exception { + + if (addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)) { + return session.getServerSession().createQueue(addressInfo.getName(), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false); + } + + if (addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) { + if (!bindingQueryResult.getQueueNames().isEmpty()) { + SimpleString name = null; + for (SimpleString qName : bindingQueryResult.getQueueNames()) { + if (name == null) { + name = qName; + } else if (qName.equals(addressInfo.getName())) { + name = qName; + } + } + return session.getServer().locateQueue(name); + } else { + try { + return session.getServerSession().createQueue(addressInfo.getName(), addressInfo.getName(), RoutingType.ANYCAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false); + } catch (ActiveMQQueueExistsException e) { + return session.getServer().locateQueue(addressInfo.getName()); + } + } + } + + Set routingTypeSet = new HashSet(); + routingTypeSet.add(RoutingType.MULTICAST); + routingTypeSet.add(RoutingType.ANYCAST); + throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(addressInfo.getRoutingType(), addressInfo.getName().toString(), routingTypeSet); + } + /** * Creates a new consumer for the queue associated with a subscription */ @@ -122,10 +171,6 @@ public class MQTTSubscriptionManager { String topic = subscription.topicName(); String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic); - AddressInfo addressInfo = session.getServer().getAddressInfo(new SimpleString(coreAddress)); - if (addressInfo != null && !addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)) { - throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress), RoutingType.MULTICAST, addressInfo.getRoutingTypes()); - } session.getSessionState().addSubscription(subscription); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index e80bdc0748..49cf471b55 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1510,7 +1510,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public AddressInfo getAddress(SimpleString address) { - return server.getPostOffice().getAddressInfo(address); + return server.getPostOffice().getAddressInfo(removePrefix(address)); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 58d75d8a60..79029be905 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -27,9 +27,11 @@ import java.lang.reflect.Field; import java.net.ProtocolException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -38,7 +40,6 @@ import java.util.regex.Pattern; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; -import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait; @@ -1642,50 +1643,107 @@ public class MQTTTest extends MQTTTestSupport { } @Test(timeout = 60 * 1000) - public void testClientDisconnectedWhenTryingToSubscribeToAnAnycastAddress() throws Exception { - Exception peerDisconnectedException = null; - try { - String clientId = "test.mqtt"; - SimpleString coreAddress = new SimpleString("foo.bar"); - Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; + public void testAnycastPrefixWorksWithMQTT() throws Exception { + String clientId = "testMqtt"; - AddressInfo addressInfo = new AddressInfo(coreAddress); - addressInfo.addRoutingType(RoutingType.ANYCAST); - getServer().createOrUpdateAddressInfo(addressInfo); + String anycastAddress = "anycast:foo/bar"; + String sendAddress = "foo/bar"; + Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)}; - MQTT mqtt = createMQTTConnection(); - mqtt.setClientId(clientId); - mqtt.setKeepAlive((short) 2); - final BlockingConnection connection = mqtt.blockingConnection(); - connection.connect(); - connection.subscribe(mqttSubscription); - } catch (EOFException e) { - peerDisconnectedException = e; - } - assertNotNull(peerDisconnectedException); - assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected")); + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); + BlockingConnection connection1 = mqtt.blockingConnection(); + connection1.connect(); + connection1.subscribe(mqttSubscription); + + MQTT mqtt2 = createMQTTConnection(); + mqtt2.setClientId(clientId + "2"); + BlockingConnection connection2 = mqtt2.blockingConnection(); + connection2.connect(); + connection2.subscribe(mqttSubscription); + + String message1 = "TestMessage1"; + String message2 = "TestMessage2"; + + connection1.publish(sendAddress, message1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection2.publish(sendAddress, message2.getBytes(), QoS.AT_LEAST_ONCE, false); + + assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS)); + assertNull(connection1.receive(1000, TimeUnit.MILLISECONDS)); + + assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS)); + assertNull(connection2.receive(1000, TimeUnit.MILLISECONDS)); } @Test(timeout = 60 * 1000) - public void testClientDisconnectedWhenTryingToSubscribeToAnExistingQueueWithDeleteOnNoConsumers() throws Exception { - Exception peerDisconnectedException = null; - try { - String clientId = "testMqtt"; - SimpleString coreAddress = new SimpleString("foo.bar"); - getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, true); + public void testAnycastAddressWorksWithMQTT() throws Exception { + String anycastAddress = "foo/bar"; - Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; + getServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), RoutingType.ANYCAST)); + String clientId = "testMqtt"; - MQTT mqtt = createMQTTConnection(); - mqtt.setClientId(clientId); - mqtt.setKeepAlive((short) 2); - final BlockingConnection connection = mqtt.blockingConnection(); - connection.connect(); - connection.subscribe(mqttSubscription); - } catch (EOFException e) { - peerDisconnectedException = e; - } - assertNotNull(peerDisconnectedException); - assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected")); + Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)}; + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); + BlockingConnection connection1 = mqtt.blockingConnection(); + connection1.connect(); + connection1.subscribe(mqttSubscription); + + MQTT mqtt2 = createMQTTConnection(); + mqtt2.setClientId(clientId + "2"); + BlockingConnection connection2 = mqtt2.blockingConnection(); + connection2.connect(); + connection2.subscribe(mqttSubscription); + + String message1 = "TestMessage1"; + String message2 = "TestMessage2"; + + connection1.publish(anycastAddress, message1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection2.publish(anycastAddress, message2.getBytes(), QoS.AT_LEAST_ONCE, false); + + assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS)); + assertNull(connection1.receive(1000, TimeUnit.MILLISECONDS)); + + assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS)); + assertNull(connection2.receive(1000, TimeUnit.MILLISECONDS)); + } + + @Test(timeout = 60 * 1000) + public void testAmbiguousRoutingWithMQTT() throws Exception { + String anycastAddress = "foo/bar"; + + Set routingTypeSet = new HashSet<>(); + routingTypeSet.add(RoutingType.ANYCAST); + routingTypeSet.add(RoutingType.MULTICAST); + + getServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), routingTypeSet)); + String clientId = "testMqtt"; + + Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)}; + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); + BlockingConnection connection1 = mqtt.blockingConnection(); + connection1.connect(); + connection1.subscribe(mqttSubscription); + + MQTT mqtt2 = createMQTTConnection(); + mqtt2.setClientId(clientId + "2"); + BlockingConnection connection2 = mqtt2.blockingConnection(); + connection2.connect(); + connection2.subscribe(mqttSubscription); + + String message1 = "TestMessage1"; + String message2 = "TestMessage2"; + + connection1.publish(anycastAddress, message1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection2.publish(anycastAddress, message2.getBytes(), QoS.AT_LEAST_ONCE, false); + + assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS)); + assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS)); + + assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS)); + assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS)); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index 877d5a7bdf..965804c215 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -167,8 +167,8 @@ public class MQTTTestSupport extends ActiveMQTestBase { Map params = new HashMap<>(); params.put(TransportConstants.PORT_PROP_NAME, "" + port); params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT"); - TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); - server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); + + server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + port + "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:"); LOG.info("Added connector {} to broker", getProtocolScheme()); }