From d9d98dfa8ac50468acd7c06194d9fb7ebda71dd9 Mon Sep 17 00:00:00 2001 From: Robbie Gemmell Date: Mon, 21 Sep 2020 18:07:26 +0100 Subject: [PATCH] ARTEMIS-2910: consider message annotations when determining routing type used for auto-creation with anonymous producers --- .../amqp/broker/AMQPSessionCallback.java | 8 +- .../transport/amqp/client/AmqpMessage.java | 20 ++ .../amqp/AmqpAnonymousRelayTest.java | 236 ++++++++++++++++++ .../amqp/JMSMessageProducerTest.java | 46 +++- 4 files changed, 304 insertions(+), 6 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index bd17817792..25cb4ab223 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -464,6 +464,7 @@ public class AMQPSessionCallback implements SessionCallback { context.incrementSettle(); + RoutingType routingType = null; if (address != null) { message.setAddress(address); } else { @@ -474,10 +475,15 @@ public class AMQPSessionCallback implements SessionCallback { rejectMessage(context, delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer"); return; } + + routingType = message.getRoutingType(); } //here check queue-autocreation - RoutingType routingType = context.getRoutingType(receiver, address); + if (routingType == null) { + routingType = context.getRoutingType(receiver, address); + } + if (!checkAddressAndAutocreateIfPossible(address, routingType)) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index bd5551cbd9..99ade07213 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -32,6 +32,7 @@ import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.message.Message; @@ -611,6 +612,25 @@ public class AmqpMessage { getWrappedMessage().setBody(body); } + /** + * Attempts to retrieve the message body as a String from an AmqpValue body. + * + * @return the string + * @throws NoSuchElementException if the body does not contain a AmqpValue with String. + */ + public String getText() throws NoSuchElementException { + Section body = getWrappedMessage().getBody(); + if (body instanceof AmqpValue) { + AmqpValue value = (AmqpValue) body; + + if (value.getValue() instanceof String) { + return (String) value.getValue(); + } + } + + throw new NoSuchElementException("Message does not contain a String body"); + } + /** * Sets a byte array value into the body of an outgoing Message, throws * an exception if this is an incoming message instance. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java index 1743624ffd..98938d7052 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java @@ -18,6 +18,12 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.AddressQueryResult; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -28,6 +34,10 @@ import org.junit.Test; public class AmqpAnonymousRelayTest extends AmqpClientTestSupport { + private static final String AUTO_CREATION_QUEUE_PREFIX = "AmqpAnonymousRelayTest-AutoCreateQueues."; + private static final String AUTO_CREATION_TOPIC_PREFIX = "AmqpAnonymousRelayTest-AutoCreateTopics."; + + // Disable auto-creation in the general config created by the superclass, we add specific prefixed areas with it enabled @Override protected boolean isAutoCreateQueues() { return false; @@ -38,6 +48,232 @@ public class AmqpAnonymousRelayTest extends AmqpClientTestSupport { return false; } + // Additional address configuration for auto creation of queues and topics + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + super.configureAddressPolicy(server); + + AddressSettings autoCreateQueueAddressSettings = new AddressSettings(); + autoCreateQueueAddressSettings.setAutoCreateQueues(true); + autoCreateQueueAddressSettings.setAutoCreateAddresses(true); + autoCreateQueueAddressSettings.setDefaultAddressRoutingType(RoutingType.ANYCAST); + autoCreateQueueAddressSettings.setDefaultQueueRoutingType(RoutingType.ANYCAST); + + server.getConfiguration().getAddressesSettings().put(AUTO_CREATION_QUEUE_PREFIX + "#", autoCreateQueueAddressSettings); + + AddressSettings autoCreateTopicAddressSettings = new AddressSettings(); + autoCreateTopicAddressSettings.setAutoCreateQueues(true); + autoCreateTopicAddressSettings.setAutoCreateAddresses(true); + autoCreateTopicAddressSettings.setDefaultAddressRoutingType(RoutingType.MULTICAST); + autoCreateTopicAddressSettings.setDefaultQueueRoutingType(RoutingType.MULTICAST); + + server.getConfiguration().getAddressesSettings().put(AUTO_CREATION_TOPIC_PREFIX + "#", autoCreateTopicAddressSettings); + } + + @Test(timeout = 60000) + public void testSendMessageOnAnonymousProducerCausesQueueAutoCreation() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + // We use an address in the QUEUE prefixed auto-creation area to ensure the broker picks this up + // and creates a queue, in the absense of any other message annotation / terminus capability config. + String queueName = AUTO_CREATION_QUEUE_PREFIX + getQueueName(); + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createAnonymousSender(); + AmqpMessage message = new AmqpMessage(); + message.setAddress(queueName); + message.setText(getTestName()); + + AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName)); + assertFalse(addressQueryResult.isExists()); + + sender.send(message); + sender.close(); + + addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName)); + assertTrue(addressQueryResult.isExists()); + assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST)); + assertTrue(addressQueryResult.isAutoCreated()); + + // Create a receiver and verify it can consume the message from the auto-created queue + AmqpReceiver receiver = session.createReceiver(queueName); + receiver.flow(1); + AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read message", received); + assertEquals(getTestName(), received.getText()); + received.accept(); + + receiver.close(); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendMessageOnAnonymousProducerCausesTopicAutoCreation() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + // We use an address in the TOPIC prefixed auto-creation area to ensure the broker picks this up + // and creates a topic, in the absense of any other message annotation / terminus capability config. + String topicName = AUTO_CREATION_TOPIC_PREFIX + getTopicName(); + + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createAnonymousSender(); + AmqpMessage message = new AmqpMessage(); + + message.setAddress(topicName); + message.setText("creating-topic-address"); + + AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName)); + assertFalse(addressQueryResult.isExists()); + + sender.send(message); + + addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName)); + assertTrue(addressQueryResult.isExists()); + assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST)); + assertTrue(addressQueryResult.isAutoCreated()); + + // Create 2 receivers and verify they can both consume a new message sent to the auto-created topic + AmqpReceiver receiver1 = session.createReceiver(topicName); + AmqpReceiver receiver2 = session.createReceiver(topicName); + receiver1.flow(1); + receiver2.flow(1); + + AmqpMessage message2 = new AmqpMessage(); + message2.setAddress(topicName); + message2.setText(getTestName()); + + sender.send(message2); + + AmqpMessage received1 = receiver1.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read message", received1); + assertEquals(getTestName(), received1.getText()); + received1.accept(); + + AmqpMessage received2 = receiver2.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read message", received2); + assertEquals(getTestName(), received2.getText()); + received1.accept(); + + receiver1.close(); + receiver2.close(); + sender.close(); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendMessageOnAnonymousProducerWithDestinationTypeAnnotationCausesQueueAutoCreation() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createAnonymousSender(); + AmqpMessage message = new AmqpMessage(); + message.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.QUEUE_TYPE); + + // We deliberately use the TOPIC prefixed auto-creation area, not the QUEUE prefix, to ensure + // we get a queue because the broker inspects the value we send on the message, and not just + // because it was taken as a default from the address settings. + String queueName = AUTO_CREATION_TOPIC_PREFIX + getQueueName(); + + message.setAddress(queueName); + message.setText(getTestName()); + + AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName)); + assertFalse(addressQueryResult.isExists()); + + sender.send(message); + sender.close(); + + addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName)); + assertTrue(addressQueryResult.isExists()); + assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST)); + assertTrue(addressQueryResult.isAutoCreated()); + + // Create a receiver and verify it can consume the message from the auto-created queue + AmqpReceiver receiver = session.createReceiver(queueName); + receiver.flow(1); + AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read message", received); + assertEquals(getTestName(), received.getText()); + received.accept(); + + receiver.close(); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendMessageOnAnonymousProducerWithDestinationTypeAnnotationCausesTopicAutoCreation() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createAnonymousSender(); + AmqpMessage message = new AmqpMessage(); + message.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.TOPIC_TYPE); + + // We deliberately use the QUEUE prefixed auto-creation area, not the TOPIC prefix, to ensure + // we get a topic because the broker inspects the value we send on the message, and not just + // because it was taken as a default from the address settings. + String topicName = AUTO_CREATION_QUEUE_PREFIX + getTopicName(); + message.setAddress(topicName); + message.setText("creating-topic-address"); + + AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName)); + assertFalse(addressQueryResult.isExists()); + + sender.send(message); + + addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName)); + assertTrue(addressQueryResult.isExists()); + assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST)); + assertTrue(addressQueryResult.isAutoCreated()); + + // Create 2 receivers and verify they can both consume a new message sent to the auto-created topic + AmqpReceiver receiver1 = session.createReceiver(topicName); + AmqpReceiver receiver2 = session.createReceiver(topicName); + receiver1.flow(1); + receiver2.flow(1); + + AmqpMessage message2 = new AmqpMessage(); + message2.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.TOPIC_TYPE); + message2.setAddress(topicName); + message2.setText(getTestName()); + + sender.send(message2); + + AmqpMessage received1 = receiver1.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read message", received1); + assertEquals(getTestName(), received1.getText()); + received1.accept(); + + AmqpMessage received2 = receiver2.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read message", received2); + assertEquals(getTestName(), received2.getText()); + received1.accept(); + + receiver1.close(); + receiver2.close(); + sender.close(); + } finally { + connection.close(); + } + } + @Test(timeout = 60000) public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception { AmqpClient client = createAmqpClient(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java index 2125ed811f..fddc19c62e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java @@ -36,6 +36,35 @@ import org.junit.Test; public class JMSMessageProducerTest extends JMSClientTestSupport { + @Test(timeout = 30000) + public void testAnonymousProducerWithQueueAutoCreation() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = UUID.randomUUID().toString() + ":" + getQueueName(); + Queue queue = session.createQueue(queueName); + MessageProducer p = session.createProducer(null); + + TextMessage message = session.createTextMessage(); + message.setText(getTestName()); + // This will auto-create the address, and be retained for subsequent consumption + p.send(queue, message); + + { + MessageConsumer consumer = session.createConsumer(queue); + p.send(queue, message); + Message msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals(getTestName(), ((TextMessage)msg).getText()); + consumer.close(); + } + } finally { + connection.close(); + } + } + @Test(timeout = 30000) public void testAnonymousProducer() throws Exception { Connection connection = createConnection(); @@ -71,25 +100,32 @@ public class JMSMessageProducerTest extends JMSClientTestSupport { } @Test(timeout = 30000) - public void testAnonymousProducerWithAutoCreation() throws Exception { + public void testAnonymousProducerWithTopicAutoCreation() throws Exception { Connection connection = createConnection(); try { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(UUID.randomUUID().toString()); + String topicName = UUID.randomUUID().toString() + ":" + getQueueName(); + Topic topic = session.createTopic(topicName); MessageProducer p = session.createProducer(null); TextMessage message = session.createTextMessage(); - message.setText("hello"); - // this will auto-create the address + message.setText("creating-topic-address"); + // This will auto-create the address, but msg will be discarded as there are no consumers p.send(topic, message); { + // This will create a new consumer, on the topic address, verifying it can attach + // and then receives a further sent message MessageConsumer consumer = session.createConsumer(topic); - p.send(topic, message); + Message message2 = message = session.createTextMessage(getTestName()); + + p.send(topic, message2); + Message msg = consumer.receive(2000); assertNotNull(msg); assertTrue(msg instanceof TextMessage); + assertEquals(getTestName(), ((TextMessage)msg).getText()); consumer.close(); } } finally {