This closes #958

This commit is contained in:
Clebert Suconic 2017-01-12 12:22:15 -05:00
commit c441625e5a
2 changed files with 19 additions and 1 deletions

View File

@ -246,7 +246,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
if (clientDefined) {
multicast = hasCapabilities(TOPIC, source);
AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true);
AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
if (!addressQueryResult.isExists()) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
}

View File

@ -301,6 +301,24 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testAddressDoesntExist() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createNonSharedSource(TerminusDurability.CONFIGURATION);
Source source1 = createSharedSource(TerminusDurability.CONFIGURATION);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
try {
session.createMulticastReceiver(source1, "myReceiverID", "mySub|2");
fail("Exception expected");
} catch (Exception e) {
//expected
}
connection.close();
}
private Source createNonSharedSource(TerminusDurability terminusDurability) {
Source source = new Source();
source.setAddress(address.toString());