ARTEMIS-2910: consider message annotations when determining routing type used for auto-creation with anonymous producers

This commit is contained in:
Robbie Gemmell 2020-09-21 18:07:26 +01:00
parent b89690813d
commit d9d98dfa8a
4 changed files with 304 additions and 6 deletions

View File

@ -464,6 +464,7 @@ public class AMQPSessionCallback implements SessionCallback {
context.incrementSettle(); context.incrementSettle();
RoutingType routingType = null;
if (address != null) { if (address != null) {
message.setAddress(address); message.setAddress(address);
} else { } else {
@ -474,10 +475,15 @@ public class AMQPSessionCallback implements SessionCallback {
rejectMessage(context, delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer"); rejectMessage(context, delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
return; return;
} }
routingType = message.getRoutingType();
} }
//here check queue-autocreation //here check queue-autocreation
RoutingType routingType = context.getRoutingType(receiver, address); if (routingType == null) {
routingType = context.getRoutingType(receiver, address);
}
if (!checkAddressAndAutocreateIfPossible(address, routingType)) { if (!checkAddressAndAutocreateIfPossible(address, routingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
} }

View File

@ -32,6 +32,7 @@ import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.Message;
@ -611,6 +612,25 @@ public class AmqpMessage {
getWrappedMessage().setBody(body); getWrappedMessage().setBody(body);
} }
/**
* Attempts to retrieve the message body as a String from an AmqpValue body.
*
* @return the string
* @throws NoSuchElementException if the body does not contain a AmqpValue with String.
*/
public String getText() throws NoSuchElementException {
Section body = getWrappedMessage().getBody();
if (body instanceof AmqpValue) {
AmqpValue value = (AmqpValue) body;
if (value.getValue() instanceof String) {
return (String) value.getValue();
}
}
throw new NoSuchElementException("Message does not contain a String body");
}
/** /**
* Sets a byte array value into the body of an outgoing Message, throws * Sets a byte array value into the body of an outgoing Message, throws
* an exception if this is an incoming message instance. * an exception if this is an incoming message instance.

View File

@ -18,6 +18,12 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -28,6 +34,10 @@ import org.junit.Test;
public class AmqpAnonymousRelayTest extends AmqpClientTestSupport { public class AmqpAnonymousRelayTest extends AmqpClientTestSupport {
private static final String AUTO_CREATION_QUEUE_PREFIX = "AmqpAnonymousRelayTest-AutoCreateQueues.";
private static final String AUTO_CREATION_TOPIC_PREFIX = "AmqpAnonymousRelayTest-AutoCreateTopics.";
// Disable auto-creation in the general config created by the superclass, we add specific prefixed areas with it enabled
@Override @Override
protected boolean isAutoCreateQueues() { protected boolean isAutoCreateQueues() {
return false; return false;
@ -38,6 +48,232 @@ public class AmqpAnonymousRelayTest extends AmqpClientTestSupport {
return false; return false;
} }
// Additional address configuration for auto creation of queues and topics
@Override
protected void configureAddressPolicy(ActiveMQServer server) {
super.configureAddressPolicy(server);
AddressSettings autoCreateQueueAddressSettings = new AddressSettings();
autoCreateQueueAddressSettings.setAutoCreateQueues(true);
autoCreateQueueAddressSettings.setAutoCreateAddresses(true);
autoCreateQueueAddressSettings.setDefaultAddressRoutingType(RoutingType.ANYCAST);
autoCreateQueueAddressSettings.setDefaultQueueRoutingType(RoutingType.ANYCAST);
server.getConfiguration().getAddressesSettings().put(AUTO_CREATION_QUEUE_PREFIX + "#", autoCreateQueueAddressSettings);
AddressSettings autoCreateTopicAddressSettings = new AddressSettings();
autoCreateTopicAddressSettings.setAutoCreateQueues(true);
autoCreateTopicAddressSettings.setAutoCreateAddresses(true);
autoCreateTopicAddressSettings.setDefaultAddressRoutingType(RoutingType.MULTICAST);
autoCreateTopicAddressSettings.setDefaultQueueRoutingType(RoutingType.MULTICAST);
server.getConfiguration().getAddressesSettings().put(AUTO_CREATION_TOPIC_PREFIX + "#", autoCreateTopicAddressSettings);
}
@Test(timeout = 60000)
public void testSendMessageOnAnonymousProducerCausesQueueAutoCreation() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
// We use an address in the QUEUE prefixed auto-creation area to ensure the broker picks this up
// and creates a queue, in the absense of any other message annotation / terminus capability config.
String queueName = AUTO_CREATION_QUEUE_PREFIX + getQueueName();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(queueName);
message.setText(getTestName());
AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName));
assertFalse(addressQueryResult.isExists());
sender.send(message);
sender.close();
addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName));
assertTrue(addressQueryResult.isExists());
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST));
assertTrue(addressQueryResult.isAutoCreated());
// Create a receiver and verify it can consume the message from the auto-created queue
AmqpReceiver receiver = session.createReceiver(queueName);
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
assertEquals(getTestName(), received.getText());
received.accept();
receiver.close();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageOnAnonymousProducerCausesTopicAutoCreation() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
// We use an address in the TOPIC prefixed auto-creation area to ensure the broker picks this up
// and creates a topic, in the absense of any other message annotation / terminus capability config.
String topicName = AUTO_CREATION_TOPIC_PREFIX + getTopicName();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setAddress(topicName);
message.setText("creating-topic-address");
AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName));
assertFalse(addressQueryResult.isExists());
sender.send(message);
addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName));
assertTrue(addressQueryResult.isExists());
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
assertTrue(addressQueryResult.isAutoCreated());
// Create 2 receivers and verify they can both consume a new message sent to the auto-created topic
AmqpReceiver receiver1 = session.createReceiver(topicName);
AmqpReceiver receiver2 = session.createReceiver(topicName);
receiver1.flow(1);
receiver2.flow(1);
AmqpMessage message2 = new AmqpMessage();
message2.setAddress(topicName);
message2.setText(getTestName());
sender.send(message2);
AmqpMessage received1 = receiver1.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received1);
assertEquals(getTestName(), received1.getText());
received1.accept();
AmqpMessage received2 = receiver2.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received2);
assertEquals(getTestName(), received2.getText());
received1.accept();
receiver1.close();
receiver2.close();
sender.close();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageOnAnonymousProducerWithDestinationTypeAnnotationCausesQueueAutoCreation() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.QUEUE_TYPE);
// We deliberately use the TOPIC prefixed auto-creation area, not the QUEUE prefix, to ensure
// we get a queue because the broker inspects the value we send on the message, and not just
// because it was taken as a default from the address settings.
String queueName = AUTO_CREATION_TOPIC_PREFIX + getQueueName();
message.setAddress(queueName);
message.setText(getTestName());
AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName));
assertFalse(addressQueryResult.isExists());
sender.send(message);
sender.close();
addressQueryResult = server.addressQuery(SimpleString.toSimpleString(queueName));
assertTrue(addressQueryResult.isExists());
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST));
assertTrue(addressQueryResult.isAutoCreated());
// Create a receiver and verify it can consume the message from the auto-created queue
AmqpReceiver receiver = session.createReceiver(queueName);
receiver.flow(1);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
assertEquals(getTestName(), received.getText());
received.accept();
receiver.close();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageOnAnonymousProducerWithDestinationTypeAnnotationCausesTopicAutoCreation() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
message.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.TOPIC_TYPE);
// We deliberately use the QUEUE prefixed auto-creation area, not the TOPIC prefix, to ensure
// we get a topic because the broker inspects the value we send on the message, and not just
// because it was taken as a default from the address settings.
String topicName = AUTO_CREATION_QUEUE_PREFIX + getTopicName();
message.setAddress(topicName);
message.setText("creating-topic-address");
AddressQueryResult addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName));
assertFalse(addressQueryResult.isExists());
sender.send(message);
addressQueryResult = server.addressQuery(SimpleString.toSimpleString(topicName));
assertTrue(addressQueryResult.isExists());
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
assertTrue(addressQueryResult.isAutoCreated());
// Create 2 receivers and verify they can both consume a new message sent to the auto-created topic
AmqpReceiver receiver1 = session.createReceiver(topicName);
AmqpReceiver receiver2 = session.createReceiver(topicName);
receiver1.flow(1);
receiver2.flow(1);
AmqpMessage message2 = new AmqpMessage();
message2.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(), AMQPMessageSupport.TOPIC_TYPE);
message2.setAddress(topicName);
message2.setText(getTestName());
sender.send(message2);
AmqpMessage received1 = receiver1.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received1);
assertEquals(getTestName(), received1.getText());
received1.accept();
AmqpMessage received2 = receiver2.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received2);
assertEquals(getTestName(), received2.getText());
received1.accept();
receiver1.close();
receiver2.close();
sender.close();
} finally {
connection.close();
}
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception { public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception {
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();

View File

@ -36,6 +36,35 @@ import org.junit.Test;
public class JMSMessageProducerTest extends JMSClientTestSupport { public class JMSMessageProducerTest extends JMSClientTestSupport {
@Test(timeout = 30000)
public void testAnonymousProducerWithQueueAutoCreation() throws Exception {
Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = UUID.randomUUID().toString() + ":" + getQueueName();
Queue queue = session.createQueue(queueName);
MessageProducer p = session.createProducer(null);
TextMessage message = session.createTextMessage();
message.setText(getTestName());
// This will auto-create the address, and be retained for subsequent consumption
p.send(queue, message);
{
MessageConsumer consumer = session.createConsumer(queue);
p.send(queue, message);
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals(getTestName(), ((TextMessage)msg).getText());
consumer.close();
}
} finally {
connection.close();
}
}
@Test(timeout = 30000) @Test(timeout = 30000)
public void testAnonymousProducer() throws Exception { public void testAnonymousProducer() throws Exception {
Connection connection = createConnection(); Connection connection = createConnection();
@ -71,25 +100,32 @@ public class JMSMessageProducerTest extends JMSClientTestSupport {
} }
@Test(timeout = 30000) @Test(timeout = 30000)
public void testAnonymousProducerWithAutoCreation() throws Exception { public void testAnonymousProducerWithTopicAutoCreation() throws Exception {
Connection connection = createConnection(); Connection connection = createConnection();
try { try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(UUID.randomUUID().toString()); String topicName = UUID.randomUUID().toString() + ":" + getQueueName();
Topic topic = session.createTopic(topicName);
MessageProducer p = session.createProducer(null); MessageProducer p = session.createProducer(null);
TextMessage message = session.createTextMessage(); TextMessage message = session.createTextMessage();
message.setText("hello"); message.setText("creating-topic-address");
// this will auto-create the address // This will auto-create the address, but msg will be discarded as there are no consumers
p.send(topic, message); p.send(topic, message);
{ {
// This will create a new consumer, on the topic address, verifying it can attach
// and then receives a further sent message
MessageConsumer consumer = session.createConsumer(topic); MessageConsumer consumer = session.createConsumer(topic);
p.send(topic, message); Message message2 = message = session.createTextMessage(getTestName());
p.send(topic, message2);
Message msg = consumer.receive(2000); Message msg = consumer.receive(2000);
assertNotNull(msg); assertNotNull(msg);
assertTrue(msg instanceof TextMessage); assertTrue(msg instanceof TextMessage);
assertEquals(getTestName(), ((TextMessage)msg).getText());
consumer.close(); consumer.close();
} }
} finally { } finally {