diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index d287417e93..8d42fe90cd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -246,7 +246,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source); if (clientDefined) { multicast = hasCapabilities(TOPIC, source); - AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true); + AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true); if (!addressQueryResult.isExists()) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java index 9b5187f002..f4643f0091 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java @@ -301,6 +301,24 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { connection.close(); } + @Test(timeout = 60000) + public void testAddressDoesntExist() throws Exception { + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createNonSharedSource(TerminusDurability.CONFIGURATION); + Source source1 = createSharedSource(TerminusDurability.CONFIGURATION); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + try { + session.createMulticastReceiver(source1, "myReceiverID", "mySub|2"); + fail("Exception expected"); + } catch (Exception e) { + //expected + } + connection.close(); + } + private Source createNonSharedSource(TerminusDurability terminusDurability) { Source source = new Source(); source.setAddress(address.toString());