ARTEMIS-2229 Qpid jms consumer cannot receive from multicast queue using FQQN

If a client sends a message to a multicast address and using a qpid-jms
client to receive the message from one of the queues using fully
qualified queue name will fail with following error message:

Address xxxx is not configured for queue support
[condition = amqp:illegal-state]

It should be able to receive the message without any error.
This commit is contained in:
Howard Gao 2019-01-15 21:45:00 +08:00 committed by Clebert Suconic
parent 1e65b295c1
commit 882da19c8a
3 changed files with 47 additions and 4 deletions

View File

@ -278,8 +278,11 @@ public class AMQPSessionCallback implements SessionCallback {
}
// if auto-create we will return whatever type was used before
if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated() && queueQueryResult.getRoutingType() != routingType) {
throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated()) {
//if routingType is null we bypass the check
if (routingType != null && queueQueryResult.getRoutingType() != routingType) {
throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
}
}
return queueQueryResult;

View File

@ -348,7 +348,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for topic support");
} else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) {
throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support");
//if client specifies fully qualified name that's allowed, don't throw exception.
if (queueNameToUse == null) {
throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support");
}
}
} else {
// if not we look up the address
@ -446,7 +449,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
} else {
if (queueNameToUse != null) {
SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST);
//a queue consumer can receive from a multicast queue if it uses a fully qualified name
//setting routingType to null means do not check the routingType against the Queue's routing type.
routingTypeToUse = null;
SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, null);
if (matchingAnycastQueue != null) {
queue = matchingAnycastQueue;
} else {

View File

@ -202,6 +202,40 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
}
}
@Test
public void testQueueConsumerReceiveTopicUsingFQQN() throws Exception {
SimpleString queueName1 = new SimpleString("sub.queue1");
SimpleString queueName2 = new SimpleString("sub.queue2");
server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName1, null, false, false);
server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName2, null, false, false);
Connection connection = createConnection(false);
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue fqqn1 = session.createQueue(multicastAddress.toString() + "::" + queueName1);
javax.jms.Queue fqqn2 = session.createQueue(multicastAddress.toString() + "::" + queueName2);
MessageConsumer consumer1 = session.createConsumer(fqqn1);
MessageConsumer consumer2 = session.createConsumer(fqqn2);
Topic topic = session.createTopic(multicastAddress.toString());
MessageProducer producer = session.createProducer(topic);
producer.send(session.createMessage());
Message m = consumer1.receive(2000);
assertNotNull(m);
m = consumer2.receive(2000);
assertNotNull(m);
} finally {
connection.close();
}
}
@Test
public void testQueue() throws Exception {
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);