diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 63f5521427..f850cc185e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -289,8 +289,11 @@ public class AMQPSessionCallback implements SessionCallback { } // if auto-create we will return whatever type was used before - if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated() && queueQueryResult.getRoutingType() != routingType) { - throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType); + if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated()) { + //if routingType is null we bypass the check + if (routingType != null && queueQueryResult.getRoutingType() != routingType) { + throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType); + } } return queueQueryResult; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 60d5a5a9f2..580c4ce9e8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -347,7 +347,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) { throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for topic support"); } else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) { - throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support"); + //if client specifies fully qualified name that's allowed, don't throw exception. + if (queueNameToUse == null) { + throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support"); + } } } else { // if not we look up the address @@ -445,7 +448,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } else { if (queueNameToUse != null) { - SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST); + //a queue consumer can receive from a multicast queue if it uses a fully qualified name + //setting routingType to null means do not check the routingType against the Queue's routing type. + routingTypeToUse = null; + SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, null); if (matchingAnycastQueue != null) { queue = matchingAnycastQueue; } else { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java index b9adc596c4..ba9ff23dca 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java @@ -199,6 +199,40 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport { } } + @Test + public void testQueueConsumerReceiveTopicUsingFQQN() throws Exception { + + SimpleString queueName1 = new SimpleString("sub.queue1"); + SimpleString queueName2 = new SimpleString("sub.queue2"); + server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName1, null, false, false); + server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName2, null, false, false); + Connection connection = createConnection(false); + + try { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue fqqn1 = session.createQueue(multicastAddress.toString() + "::" + queueName1); + javax.jms.Queue fqqn2 = session.createQueue(multicastAddress.toString() + "::" + queueName2); + + MessageConsumer consumer1 = session.createConsumer(fqqn1); + MessageConsumer consumer2 = session.createConsumer(fqqn2); + + Topic topic = session.createTopic(multicastAddress.toString()); + MessageProducer producer = session.createProducer(topic); + + producer.send(session.createMessage()); + + Message m = consumer1.receive(2000); + assertNotNull(m); + + m = consumer2.receive(2000); + assertNotNull(m); + + } finally { + connection.close(); + } + } + @Test public void testQueue() throws Exception { server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true));