ARTEMIS-2229 turns out a valid bug. So restore the fix commit.

this was reverted previously on commit f4436a9f72.

It seems it is ok after further changes into FQQN, so this is being reapplied
This commit is contained in:
Howard Gao 2019-03-13 18:34:20 +08:00 committed by Clebert Suconic
parent e217f9ab6c
commit 9f7d23c36c
3 changed files with 47 additions and 4 deletions

View File

@ -289,9 +289,12 @@ public class AMQPSessionCallback implements SessionCallback {
}
// if auto-create we will return whatever type was used before
if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated() && queueQueryResult.getRoutingType() != routingType) {
if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated()) {
//if routingType is null we bypass the check
if (routingType != null && queueQueryResult.getRoutingType() != routingType) {
throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
}
}
return queueQueryResult;
}

View File

@ -347,8 +347,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for topic support");
} else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) {
//if client specifies fully qualified name that's allowed, don't throw exception.
if (queueNameToUse == null) {
throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support");
}
}
} else {
// if not we look up the address
AddressQueryResult addressQueryResult = null;
@ -445,7 +448,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
} else {
if (queueNameToUse != null) {
SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST);
//a queue consumer can receive from a multicast queue if it uses a fully qualified name
//setting routingType to null means do not check the routingType against the Queue's routing type.
routingTypeToUse = null;
SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, null);
if (matchingAnycastQueue != null) {
queue = matchingAnycastQueue;
} else {

View File

@ -199,6 +199,40 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
}
}
@Test
public void testQueueConsumerReceiveTopicUsingFQQN() throws Exception {
SimpleString queueName1 = new SimpleString("sub.queue1");
SimpleString queueName2 = new SimpleString("sub.queue2");
server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName1, null, false, false);
server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName2, null, false, false);
Connection connection = createConnection(false);
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue fqqn1 = session.createQueue(multicastAddress.toString() + "::" + queueName1);
javax.jms.Queue fqqn2 = session.createQueue(multicastAddress.toString() + "::" + queueName2);
MessageConsumer consumer1 = session.createConsumer(fqqn1);
MessageConsumer consumer2 = session.createConsumer(fqqn2);
Topic topic = session.createTopic(multicastAddress.toString());
MessageProducer producer = session.createProducer(topic);
producer.send(session.createMessage());
Message m = consumer1.receive(2000);
assertNotNull(m);
m = consumer2.receive(2000);
assertNotNull(m);
} finally {
connection.close();
}
}
@Test
public void testQueue() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true));