ARTEMIS-1402 AMQP notfound on unmatched FQQN
Return an AMQP not:found error to the client, if the supplied queue in an FQQN belongs to an address other than what is provided in the FQQN.
This commit is contained in:
parent
32ac370edc
commit
152791c230
|
@ -332,13 +332,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
supportedFilters.put(filter.getKey(), filter.getValue());
|
supportedFilters.put(filter.getKey(), filter.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (queueNameToUse != null) {
|
queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST);
|
||||||
SimpleString matchingQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST);
|
|
||||||
if (matchingQueue == null) {
|
|
||||||
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueNameToUse + "' does not exist");
|
|
||||||
}
|
|
||||||
queue = matchingQueue.toString();
|
|
||||||
}
|
|
||||||
//if the address specifies a broker configured queue then we always use this, treat it as a queue
|
//if the address specifies a broker configured queue then we always use this, treat it as a queue
|
||||||
if (queue != null) {
|
if (queue != null) {
|
||||||
multicast = false;
|
multicast = false;
|
||||||
|
@ -392,7 +387,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (queueNameToUse != null) {
|
if (queueNameToUse != null) {
|
||||||
SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST);
|
SimpleString matchingAnycastQueue = SimpleString.toSimpleString(getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST));
|
||||||
if (matchingAnycastQueue != null) {
|
if (matchingAnycastQueue != null) {
|
||||||
queue = matchingAnycastQueue.toString();
|
queue = matchingAnycastQueue.toString();
|
||||||
} else {
|
} else {
|
||||||
|
@ -442,6 +437,21 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
|
||||||
|
if (queueName != null) {
|
||||||
|
QueueQueryResult result = sessionSPI.queueQuery(queueName.toString(), routingType, false);
|
||||||
|
if (!result.isExists()) {
|
||||||
|
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
|
||||||
|
} else {
|
||||||
|
if (!result.getAddress().equals(address)) {
|
||||||
|
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'");
|
||||||
|
}
|
||||||
|
return sessionSPI.getMatchingQueue(address, queueName, routingType).toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
protected String getClientId() {
|
protected String getClientId() {
|
||||||
return connection.getRemoteContainer();
|
return connection.getRemoteContainer();
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,22 +89,89 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
|
||||||
assertTrue(e.getMessage().contains("Queue: '" + queueName + "' does not exist"));
|
assertTrue(e.getMessage().contains("Queue: '" + queueName + "' does not exist"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test
|
||||||
//there isn't much use of FQQN for topics
|
public void testConsumeQueueToFQQNWrongQueueAttachedToAnotherAddress() throws Exception {
|
||||||
//however we can test query functionality
|
|
||||||
public void testTopic() throws Exception {
|
|
||||||
|
|
||||||
|
// Create 2 Queues: address1::queue1, address2::queue2
|
||||||
|
String address1 = "a1";
|
||||||
|
String address2 = "a2";
|
||||||
|
String queue1 = "q1";
|
||||||
|
String queue2 = "q2";
|
||||||
|
|
||||||
|
server.createQueue(SimpleString.toSimpleString(address1), RoutingType.ANYCAST, SimpleString.toSimpleString(queue1), null, true, false, -1, false, true);
|
||||||
|
server.createQueue(SimpleString.toSimpleString(address2), RoutingType.ANYCAST, SimpleString.toSimpleString(queue2), null, true, false, -1, false, true);
|
||||||
|
|
||||||
|
Exception e = null;
|
||||||
|
|
||||||
|
// Wrong FQQN. Attempt to subscribe to a queue belonging to a different address than given in the FQQN.
|
||||||
|
String wrongFQQN = address1 + "::" + queue2;
|
||||||
Connection connection = createConnection(false);
|
Connection connection = createConnection(false);
|
||||||
try {
|
try {
|
||||||
connection.setClientID("FQQNconn");
|
connection.setClientID("FQQNconn");
|
||||||
connection.start();
|
connection.start();
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
Topic topic = session.createTopic(multicastAddress.toString() + "::someaddress");
|
javax.jms.Queue queue = session.createQueue(wrongFQQN);
|
||||||
|
session.createConsumer(queue);
|
||||||
|
} catch (InvalidDestinationException ide) {
|
||||||
|
e = ide;
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
assertNotNull(e);
|
||||||
|
assertTrue(e.getMessage().contains("Queue: '" + queue2 + "' does not exist for address '" + address1 + "'"));
|
||||||
|
}
|
||||||
|
|
||||||
MessageConsumer consumer1 = session.createConsumer(topic);
|
@Test
|
||||||
MessageConsumer consumer2 = session.createConsumer(topic);
|
public void testSubscribeTopicToFQQNWrongQueueAttachedToAnotherAddress() throws Exception {
|
||||||
MessageConsumer consumer3 = session.createConsumer(topic);
|
|
||||||
|
|
||||||
|
// Create 2 Queues: address1::queue1, address2::queue2
|
||||||
|
String address1 = "a1";
|
||||||
|
String address2 = "a2";
|
||||||
|
String queue1 = "q1";
|
||||||
|
String queue2 = "q2";
|
||||||
|
|
||||||
|
server.createQueue(SimpleString.toSimpleString(address1), RoutingType.MULTICAST, SimpleString.toSimpleString(queue1), null, true, false, -1, false, true);
|
||||||
|
server.createQueue(SimpleString.toSimpleString(address2), RoutingType.MULTICAST, SimpleString.toSimpleString(queue2), null, true, false, -1, false, true);
|
||||||
|
|
||||||
|
Exception e = null;
|
||||||
|
|
||||||
|
// Wrong FQQN. Attempt to subscribe to a queue belonging to a different address than given in the FQQN.
|
||||||
|
String wrongFQQN = address1 + "::" + queue2;
|
||||||
|
Connection connection = createConnection(false);
|
||||||
|
try {
|
||||||
|
connection.setClientID("FQQNconn");
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Topic topic = session.createTopic(wrongFQQN);
|
||||||
|
session.createConsumer(topic);
|
||||||
|
} catch (InvalidDestinationException ide) {
|
||||||
|
e = ide;
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
assertNotNull(e);
|
||||||
|
assertTrue(e.getMessage().contains("Queue: '" + queue2 + "' does not exist for address '" + address1 + "'"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
//there isn't much use of FQQN for topics
|
||||||
|
//however we can test query functionality
|
||||||
|
public void testTopic() throws Exception {
|
||||||
|
|
||||||
|
SimpleString queueName = new SimpleString("someAddress");
|
||||||
|
server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName, null, false, false);
|
||||||
|
Connection connection = createConnection(false);
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection.setClientID("FQQNconn");
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Topic fqqn = session.createTopic(multicastAddress.toString() + "::" + queueName);
|
||||||
|
|
||||||
|
MessageConsumer consumer1 = session.createConsumer(fqqn);
|
||||||
|
MessageConsumer consumer2 = session.createConsumer(fqqn);
|
||||||
|
|
||||||
|
Topic topic = session.createTopic(multicastAddress.toString());
|
||||||
MessageProducer producer = session.createProducer(topic);
|
MessageProducer producer = session.createProducer(topic);
|
||||||
|
|
||||||
producer.send(session.createMessage());
|
producer.send(session.createMessage());
|
||||||
|
@ -112,10 +179,10 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
|
||||||
//each consumer receives one
|
//each consumer receives one
|
||||||
Message m = consumer1.receive(2000);
|
Message m = consumer1.receive(2000);
|
||||||
assertNotNull(m);
|
assertNotNull(m);
|
||||||
|
|
||||||
|
// Subscribing to FQQN is akin to shared subscription
|
||||||
m = consumer2.receive(2000);
|
m = consumer2.receive(2000);
|
||||||
assertNotNull(m);
|
assertNull(m);
|
||||||
m = consumer3.receive(2000);
|
|
||||||
assertNotNull(m);
|
|
||||||
|
|
||||||
Bindings bindings = server.getPostOffice().getBindingsForAddress(multicastAddress);
|
Bindings bindings = server.getPostOffice().getBindingsForAddress(multicastAddress);
|
||||||
for (Binding b : bindings.getBindings()) {
|
for (Binding b : bindings.getBindings()) {
|
||||||
|
@ -192,11 +259,16 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Broker should return exception if no address is passed in FQQN.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testQueueSpecial() throws Exception {
|
public void testQueueSpecial() throws Exception {
|
||||||
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
|
server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true);
|
||||||
|
|
||||||
Connection connection = createConnection();
|
Connection connection = createConnection();
|
||||||
|
Exception expectedException = null;
|
||||||
try {
|
try {
|
||||||
connection.start();
|
connection.start();
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
@ -204,39 +276,11 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
|
||||||
//::queue ok!
|
//::queue ok!
|
||||||
String specialName = CompositeAddress.toFullQN(new SimpleString(""), anycastQ1).toString();
|
String specialName = CompositeAddress.toFullQN(new SimpleString(""), anycastQ1).toString();
|
||||||
javax.jms.Queue q1 = session.createQueue(specialName);
|
javax.jms.Queue q1 = session.createQueue(specialName);
|
||||||
|
|
||||||
ClientSessionFactory cf = createSessionFactory(locator);
|
|
||||||
ClientSession coreSession = cf.createSession();
|
|
||||||
|
|
||||||
ClientProducer coreProducer = coreSession.createProducer(anycastAddress);
|
|
||||||
sendMessages(coreSession, coreProducer, 1);
|
|
||||||
|
|
||||||
System.out.println("create consumer: " + q1);
|
|
||||||
MessageConsumer consumer1 = session.createConsumer(q1);
|
|
||||||
|
|
||||||
assertNotNull(consumer1.receive(2000));
|
|
||||||
|
|
||||||
//queue::
|
|
||||||
specialName = CompositeAddress.toFullQN(anycastQ1, new SimpleString("")).toString();
|
|
||||||
q1 = session.createQueue(specialName);
|
|
||||||
try {
|
|
||||||
session.createConsumer(q1);
|
session.createConsumer(q1);
|
||||||
fail("should get exception");
|
|
||||||
} catch (InvalidDestinationException e) {
|
} catch (InvalidDestinationException e) {
|
||||||
//expected
|
expectedException = e;
|
||||||
}
|
|
||||||
|
|
||||||
//::
|
|
||||||
specialName = CompositeAddress.toFullQN(new SimpleString(""), new SimpleString("")).toString();
|
|
||||||
q1 = session.createQueue(specialName);
|
|
||||||
try {
|
|
||||||
session.createConsumer(q1);
|
|
||||||
fail("should get exception");
|
|
||||||
} catch (InvalidDestinationException e) {
|
|
||||||
//expected
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
connection.close();
|
|
||||||
}
|
}
|
||||||
|
assertNotNull(expectedException);
|
||||||
|
assertTrue(expectedException.getMessage().contains("Queue: 'q1' does not exist for address ''"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue