From 152791c230bd9c2d79a190fc555772f32472426f Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Thu, 7 Sep 2017 10:40:27 +0100 Subject: [PATCH] ARTEMIS-1402 AMQP notfound on unmatched FQQN Return an AMQP not:found error to the client, if the supplied queue in an FQQN belongs to an address other than what is provided in the FQQN. --- .../proton/ProtonServerSenderContext.java | 26 ++-- .../amqp/AmqpFullyQualifiedNameTest.java | 132 ++++++++++++------ 2 files changed, 106 insertions(+), 52 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index d2a097c23c..fbaae8ab42 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -332,13 +332,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr supportedFilters.put(filter.getKey(), filter.getValue()); } - if (queueNameToUse != null) { - SimpleString matchingQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST); - if (matchingQueue == null) { - throw new ActiveMQAMQPNotFoundException("Queue: '" + queueNameToUse + "' does not exist"); - } - queue = matchingQueue.toString(); - } + queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST); + //if the address specifies a broker configured queue then we always use this, treat it as a queue if (queue != null) { multicast = false; @@ -392,7 +387,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } else { if (queueNameToUse != null) { - SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST); + SimpleString matchingAnycastQueue = SimpleString.toSimpleString(getMatchingQueue(queueNameToUse, addressToUse, RoutingType.ANYCAST)); if (matchingAnycastQueue != null) { queue = matchingAnycastQueue.toString(); } else { @@ -442,6 +437,21 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } + private String getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception { + if (queueName != null) { + QueueQueryResult result = sessionSPI.queueQuery(queueName.toString(), routingType, false); + if (!result.isExists()) { + throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist"); + } else { + if (!result.getAddress().equals(address)) { + throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'"); + } + return sessionSPI.getMatchingQueue(address, queueName, routingType).toString(); + } + } + return null; + } + protected String getClientId() { return connection.getRemoteContainer(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java index eaca868cbe..9bb68f0a10 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java @@ -89,22 +89,89 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport { assertTrue(e.getMessage().contains("Queue: '" + queueName + "' does not exist")); } - @Test(timeout = 60000) - //there isn't much use of FQQN for topics - //however we can test query functionality - public void testTopic() throws Exception { + @Test + public void testConsumeQueueToFQQNWrongQueueAttachedToAnotherAddress() throws Exception { + // Create 2 Queues: address1::queue1, address2::queue2 + String address1 = "a1"; + String address2 = "a2"; + String queue1 = "q1"; + String queue2 = "q2"; + + server.createQueue(SimpleString.toSimpleString(address1), RoutingType.ANYCAST, SimpleString.toSimpleString(queue1), null, true, false, -1, false, true); + server.createQueue(SimpleString.toSimpleString(address2), RoutingType.ANYCAST, SimpleString.toSimpleString(queue2), null, true, false, -1, false, true); + + Exception e = null; + + // Wrong FQQN. Attempt to subscribe to a queue belonging to a different address than given in the FQQN. + String wrongFQQN = address1 + "::" + queue2; Connection connection = createConnection(false); try { connection.setClientID("FQQNconn"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(multicastAddress.toString() + "::someaddress"); + javax.jms.Queue queue = session.createQueue(wrongFQQN); + session.createConsumer(queue); + } catch (InvalidDestinationException ide) { + e = ide; + } finally { + connection.close(); + } + assertNotNull(e); + assertTrue(e.getMessage().contains("Queue: '" + queue2 + "' does not exist for address '" + address1 + "'")); + } - MessageConsumer consumer1 = session.createConsumer(topic); - MessageConsumer consumer2 = session.createConsumer(topic); - MessageConsumer consumer3 = session.createConsumer(topic); + @Test + public void testSubscribeTopicToFQQNWrongQueueAttachedToAnotherAddress() throws Exception { + // Create 2 Queues: address1::queue1, address2::queue2 + String address1 = "a1"; + String address2 = "a2"; + String queue1 = "q1"; + String queue2 = "q2"; + + server.createQueue(SimpleString.toSimpleString(address1), RoutingType.MULTICAST, SimpleString.toSimpleString(queue1), null, true, false, -1, false, true); + server.createQueue(SimpleString.toSimpleString(address2), RoutingType.MULTICAST, SimpleString.toSimpleString(queue2), null, true, false, -1, false, true); + + Exception e = null; + + // Wrong FQQN. Attempt to subscribe to a queue belonging to a different address than given in the FQQN. + String wrongFQQN = address1 + "::" + queue2; + Connection connection = createConnection(false); + try { + connection.setClientID("FQQNconn"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(wrongFQQN); + session.createConsumer(topic); + } catch (InvalidDestinationException ide) { + e = ide; + } finally { + connection.close(); + } + assertNotNull(e); + assertTrue(e.getMessage().contains("Queue: '" + queue2 + "' does not exist for address '" + address1 + "'")); + } + + @Test(timeout = 60000) + //there isn't much use of FQQN for topics + //however we can test query functionality + public void testTopic() throws Exception { + + SimpleString queueName = new SimpleString("someAddress"); + server.createQueue(multicastAddress, RoutingType.MULTICAST, queueName, null, false, false); + Connection connection = createConnection(false); + + try { + connection.setClientID("FQQNconn"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic fqqn = session.createTopic(multicastAddress.toString() + "::" + queueName); + + MessageConsumer consumer1 = session.createConsumer(fqqn); + MessageConsumer consumer2 = session.createConsumer(fqqn); + + Topic topic = session.createTopic(multicastAddress.toString()); MessageProducer producer = session.createProducer(topic); producer.send(session.createMessage()); @@ -112,10 +179,10 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport { //each consumer receives one Message m = consumer1.receive(2000); assertNotNull(m); + + // Subscribing to FQQN is akin to shared subscription m = consumer2.receive(2000); - assertNotNull(m); - m = consumer3.receive(2000); - assertNotNull(m); + assertNull(m); Bindings bindings = server.getPostOffice().getBindingsForAddress(multicastAddress); for (Binding b : bindings.getBindings()) { @@ -192,11 +259,16 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport { } } + /** + * Broker should return exception if no address is passed in FQQN. + * @throws Exception + */ @Test public void testQueueSpecial() throws Exception { server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true); Connection connection = createConnection(); + Exception expectedException = null; try { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -204,39 +276,11 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport { //::queue ok! String specialName = CompositeAddress.toFullQN(new SimpleString(""), anycastQ1).toString(); javax.jms.Queue q1 = session.createQueue(specialName); - - ClientSessionFactory cf = createSessionFactory(locator); - ClientSession coreSession = cf.createSession(); - - ClientProducer coreProducer = coreSession.createProducer(anycastAddress); - sendMessages(coreSession, coreProducer, 1); - - System.out.println("create consumer: " + q1); - MessageConsumer consumer1 = session.createConsumer(q1); - - assertNotNull(consumer1.receive(2000)); - - //queue:: - specialName = CompositeAddress.toFullQN(anycastQ1, new SimpleString("")).toString(); - q1 = session.createQueue(specialName); - try { - session.createConsumer(q1); - fail("should get exception"); - } catch (InvalidDestinationException e) { - //expected - } - - //:: - specialName = CompositeAddress.toFullQN(new SimpleString(""), new SimpleString("")).toString(); - q1 = session.createQueue(specialName); - try { - session.createConsumer(q1); - fail("should get exception"); - } catch (InvalidDestinationException e) { - //expected - } - } finally { - connection.close(); + session.createConsumer(q1); + } catch (InvalidDestinationException e) { + expectedException = e; } + assertNotNull(expectedException); + assertTrue(expectedException.getMessage().contains("Queue: 'q1' does not exist for address ''")); } }