diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 0e2cf6dfcb..c3643873a6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -278,8 +278,11 @@ public class AMQPSessionCallback implements SessionCallback { } // if auto-create we will return whatever type was used before - if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated() && queueQueryResult.getRoutingType() != routingType) { - throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType); + if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated()) { + //if routingType is null we bypass the check + if (routingType != null && queueQueryResult.getRoutingType() != routingType) { + throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType); + } } return queueQueryResult; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 4caf2d0047..ea8475f3a1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -348,7 +348,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) { throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for topic support"); } else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) { - throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support"); + //if client specifies fully qualified name that's allowed, don't throw exception. + if (queueNameToUse == null) { + throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support"); + } } } else { // if not we look up the address @@ -446,7 +449,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } else { if (queueNameToUse != null) { - SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST); + //a queue consumer can receive from a multicast queue if it uses a fully qualified name + //setting routingType to null means do not check the routingType against the Queue's routing type. + routingTypeToUse = null; + SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, null); if (matchingAnycastQueue != null) { queue = matchingAnycastQueue; } else { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java index 1bcf9e16e2..d8c7b2fdc0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java @@ -202,6 +202,40 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport { } } + @Test + public void testQueueConsumerReceiveTopicUsingFQQN() throws Exception { + + SimpleString queueName1 = new SimpleString("sub.queue1"); + SimpleString queueName2 = new SimpleString("sub.queue2"); + server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName1, null, false, false); + server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName2, null, false, false); + Connection connection = createConnection(false); + + try { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue fqqn1 = session.createQueue(multicastAddress.toString() + "::" + queueName1); + javax.jms.Queue fqqn2 = session.createQueue(multicastAddress.toString() + "::" + queueName2); + + MessageConsumer consumer1 = session.createConsumer(fqqn1); + MessageConsumer consumer2 = session.createConsumer(fqqn2); + + Topic topic = session.createTopic(multicastAddress.toString()); + MessageProducer producer = session.createProducer(topic); + + producer.send(session.createMessage()); + + Message m = consumer1.receive(2000); + assertNotNull(m); + + m = consumer2.receive(2000); + assertNotNull(m); + + } finally { + connection.close(); + } + } + @Test public void testQueue() throws Exception { server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);