ARTEMIS-2372 / ARTEMIS-2740 Improving Message Annotations support in AMQP

- when sending messages to DLQ or Expiry we now use x-opt legal names
- we now support filtering thorugh annotations if using m. as a prefix.
- enabling hyphenated_props: to allow m. as a prefix
This commit is contained in:
Clebert Suconic 2020-05-05 14:23:11 -04:00
parent 88b7ee36a3
commit 4fe4220ff0
10 changed files with 378 additions and 24 deletions

View File

@ -148,6 +148,11 @@ public class TypedProperties {
otherProps.forEachInternal(this::doPutValue);
}
public TypedProperties putProperty(final SimpleString key, final Object value) {
setObjectProperty(key, value, this);
return this;
}
public Object getProperty(final SimpleString key) {
return doGetProperty(key);
}

View File

@ -463,24 +463,24 @@ public interface Message {
}
default void referenceOriginalMessage(final Message original, String originalQueue) {
String queueOnMessage = original.getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
Object queueOnMessage = original.getBrokerProperty(Message.HDR_ORIGINAL_QUEUE);
if (queueOnMessage != null) {
setAnnotation(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
} else if (originalQueue != null) {
setAnnotation(Message.HDR_ORIGINAL_QUEUE, originalQueue);
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
}
Object originalID = original.getAnnotation(Message.HDR_ORIG_MESSAGE_ID);
Object originalID = original.getBrokerProperty(Message.HDR_ORIG_MESSAGE_ID);
if (originalID != null) {
setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAnnotationString(Message.HDR_ORIGINAL_ADDRESS));
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getBrokerProperty(Message.HDR_ORIGINAL_ADDRESS));
setAnnotation(Message.HDR_ORIG_MESSAGE_ID, originalID);
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, originalID);
} else {
setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
setAnnotation(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
}
// reset expiry
@ -641,6 +641,17 @@ public interface Message {
return this;
}
/** To be called by the broker on ocasions such as DLQ and expiry.
* When the broker is adding additional properties. */
default Message setBrokerProperty(SimpleString key, Object value) {
putObjectProperty(key, value);
return this;
}
default Object getBrokerProperty(SimpleString key) {
return getObjectProperty(key);
}
Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException;

View File

@ -104,6 +104,8 @@ import org.jboss.logging.Logger;
*/
public abstract class AMQPMessage extends RefCountMessage implements org.apache.activemq.artemis.api.core.Message {
private static final SimpleString ANNOTATION_AREA_PREFIX = SimpleString.toSimpleString("m.");
protected static final Logger logger = Logger.getLogger(AMQPMessage.class);
public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD");
@ -275,14 +277,18 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
@Override
public Object getObjectPropertyForFilter(SimpleString key) {
Object value = getObjectProperty(key);
if (value == null) {
value = getMessageAnnotation(key.toString());
}
if (value == null) {
value = getExtraBytesProperty(key);
if (key.startsWith(ANNOTATION_AREA_PREFIX)) {
key = key.subSeq(ANNOTATION_AREA_PREFIX.length(), key.length());
return getAnnotation(key);
}
Object value = getObjectProperty(key);
if (value == null) {
TypedProperties extra = getExtraProperties();
if (extra != null) {
value = extra.getProperty(key);
}
}
return value;
}
@ -498,6 +504,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
}
protected void setMessageAnnotation(Symbol annotation, Object value) {
if (value instanceof SimpleString) {
value = value.toString();
}
getMessageAnnotationsMap(true).put(annotation, value);
}
@ -1278,6 +1287,25 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
return this;
}
@Override
public org.apache.activemq.artemis.api.core.Message setBrokerProperty(SimpleString key, Object value) {
// Annotation names have to start with x-opt
setMessageAnnotation(AMQPMessageSupport.toAnnotationName(key.toString()), value);
createExtraProperties().putProperty(key, value);
return this;
}
@Override
public Object getBrokerProperty(SimpleString key) {
TypedProperties extra = getExtraProperties();
if (extra == null) {
return null;
}
return extra.getProperty(key);
}
// JMS Style property access methods. These can result in additional decode of AMQP message
// data from Application properties. Updates to application properties puts the message in a
// dirty state and requires a re-encode of the data to update all buffer state data otherwise

View File

@ -35,6 +35,7 @@ import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@ -63,6 +64,9 @@ public final class AMQPMessageSupport {
private static final Logger logger = Logger.getLogger(AMQPMessageSupport.class);
public static SimpleString HDR_ORIGINAL_ADDRESS_ANNOTATION = SimpleString.toSimpleString("x-opt-ORIG-ADDRESS");
public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to";
// Message Properties used to map AMQP to JMS and back
@ -178,6 +182,9 @@ public final class AMQPMessageSupport {
public static final Binary EMPTY_BINARY = new Binary(new byte[0]);
public static final Data EMPTY_BODY = new Data(EMPTY_BINARY);
public static final String X_OPT_PREFIX = "x-opt-";
public static final String AMQ_PROPERTY_PREFIX = "_AMQ_";
public static final short AMQP_UNKNOWN = 0;
public static final short AMQP_NULL = 1;
public static final short AMQP_DATA = 2;
@ -285,6 +292,18 @@ public final class AMQPMessageSupport {
}
}
public static String toAnnotationName(String key) {
if (!key.startsWith(X_OPT_PREFIX.toString())) {
if (key.startsWith(AMQ_PROPERTY_PREFIX)) {
return X_OPT_PREFIX.concat(key.substring(AMQ_PROPERTY_PREFIX.length()).replace('_', '-'));
}
return key;
}
return key;
}
public static String toAddress(Destination destination) {
try {
if (destination instanceof ActiveMQDestination) {

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.converter;
import org.apache.activemq.artemis.api.core.Message;
import org.junit.Assert;
import org.junit.Test;
public class AnnotationNameConveterTest {
@Test
public void testAnnotationName() {
try {
Assert.assertEquals("x-opt-ORIG-QUEUE", AMQPMessageSupport.toAnnotationName(Message.HDR_ORIGINAL_QUEUE.toString()));
Assert.assertEquals("x-opt-ORIG-MESSAGE-ID", AMQPMessageSupport.toAnnotationName(Message.HDR_ORIG_MESSAGE_ID.toString()));
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -3384,7 +3384,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
copy.setExpiration(0);
if (expiry) {
copy.setAnnotation(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis());
copy.setBrokerProperty(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis());
}
copy.reencode();

View File

@ -128,6 +128,45 @@ message for later delivery:
If both annotations are present in the same message then the broker will prefer
the more specific `x-opt-delivery-time` value.
## DLQ and Expiry transfer
AMQP Messages will be copied before transferred to a DLQ or ExpiryQueue and will receive properties and annotations during this process.
The broker also keeps an internal only property (called extra property) that is not exposed to the clients, and those will also be filled during this process.
Here is a list of Annotations and Property names AMQP Messages will receive when transferred:
|Annotation name| Internal Property Name|Description|
|---------------|-----------------------|-----------|
|x-opt-ORIG-MESSAGE-ID|_AMQ_ORIG_MESSAGE_ID|The original message ID before the transfer|
|x-opt-ACTUAL-EXPIRY|_AMQ_ACTUAL_EXPIRY|When the expiry took place. Milliseconds since epoch times|
|x-opt-ORIG-QUEUE|_AMQ_ORIG_QUEUE|The original queue name before the transfer|
|x-opt-ORIG-ADDRESS|_AMQ_ORIG_ADDRESS|The original address name before the transfer|
## Filtering on Message Annotations
It is possible to filter on messaging annotations if you use the prefix "m." before the annotation name.
For example if you want to filter messages sent to a specific destination, you could create your filter accordingly to this:
```java
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:5672");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
javax.jms.Queue queue = session.createQueue("my-DLQ");
MessageConsumer consumer = session.createConsumer(queue, "\"m.x-opt-ORIG-ADDRESS\"='ORIGINAL_PLACE'");
Message message = consumer.receive();
```
The broker will set internal properties. If you intend to filter after DLQ or Expiry you may choose the internal property names:
```java
// Replace the consumer creation on the previous example:
MessageConsumer consumer = session.createConsumer(queue, "_AMQ_ORIG_ADDRESS='ORIGINAL_PLACE'");
```
## Configuring AMQP Idle Timeout
It is possible to configure the AMQP Server's IDLE Timeout by setting the property amqpIdleTimeout in milliseconds on the acceptor.

View File

@ -16,10 +16,18 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@ -168,10 +176,20 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
connection = addConnection(client.connect());
session = connection.createSession();
AmqpReceiver receiverDLQ = session.createReceiver(getDeadLetterAddress(), "_AMQ_ORIG_ADDRESS='" + getQueueName() + "'");
AmqpReceiver receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m.x-opt-ORIG-ADDRESS\"='" + getQueueName() + "'");
receiverDLQ.flow(1);
received = receiverDLQ.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(received);
Assert.assertEquals(getQueueName(), received.getMessageAnnotation("x-opt-ORIG-ADDRESS"));
// close without accepting on purpose, it will issue a redelivery on the second filter
receiverDLQ.close();
// Redo the selection, however now using the extra-properties, since the broker will store these as extra properties on AMQP Messages
receiverDLQ = session.createReceiver(getDeadLetterAddress(), "_AMQ_ORIG_ADDRESS='" + getQueueName() + "'");
receiverDLQ.flow(1);
received = receiverDLQ.receive(5, TimeUnit.SECONDS);
Assert.assertEquals(getQueueName(), received.getMessageAnnotation("x-opt-ORIG-ADDRESS"));
Assert.assertNotNull(received);
received.accept();
assertNotNull("Should have read message from DLQ", received);
@ -182,6 +200,44 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
connection.close();
}
/** This test is validating a broker feature where the message copy through the DLQ will receive an annotation.
* It is also testing filter on that annotation. */
@Test(timeout = 60000)
public void testExpiryQpidJMS() throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", getBrokerAmqpConnectionURI().toString());
Connection connection = factory.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
MessageProducer sender = session.createProducer(queue);
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
sender.setTimeToLive(1);
TextMessage message = session.createTextMessage("Test-Message");
message.setStringProperty("key1", "Value1");
sender.send(message);
sender.close();
Wait.assertEquals(1, queueView::getMessagesExpired);
final Queue dlqView = getProxyToQueue(getDeadLetterAddress());
assertNotNull(dlqView);
Wait.assertEquals(1, dlqView::getMessageCount);
connection.start();
javax.jms.Queue queueDLQ = session.createQueue(getDeadLetterAddress());
MessageConsumer receiverDLQ = session.createConsumer(queueDLQ, "\"m.x-opt-ORIG-ADDRESS\"='" + getQueueName() + "'");
Message received = receiverDLQ.receive(5000);
Assert.assertNotNull(received);
receiverDLQ.close();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception {
AmqpClient client = createAmqpClient();
@ -261,14 +317,12 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpMessage message = new AmqpMessage();
message.setAbsoluteExpiryTime(0);
// AET should override any TTL set
message.setTimeToLive(1000);
message.setTimeToLive(100);
message.setText("Test-Message");
sender.send(message);
sender.close();
Wait.assertEquals(1, queueView::getMessageCount);
Thread.sleep(1000);
Wait.assertEquals(1L, queueView::getMessagesExpired, 10000, 10);
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getQueueName());
@ -426,7 +480,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(getQueueName(), message.getMessageAnnotation(org.apache.activemq.artemis.api.core.Message.HDR_ORIGINAL_ADDRESS.toString()));
assertEquals(getQueueName(), message.getMessageAnnotation("x-opt-ORIG-QUEUE"));
assertNull(message.getDeliveryAnnotation("shouldDisappear"));
assertNull(receiver.receiveNoWait());
} finally {

View File

@ -396,19 +396,19 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
message.setMessageAnnotation("serialNo", 1);
message.setMessageAnnotation("x-opt-serialNo", 1);
message.setText("Test-Message");
sender.send(message);
message = new AmqpMessage();
message.setMessageId("msg" + 2);
message.setMessageAnnotation("serialNo", 2);
message.setMessageAnnotation("x-opt-serialNo", 2);
message.setText("Test-Message 2");
sender.send(message);
sender.close();
LOG.debug("Attempting to read message with receiver");
AmqpReceiver receiver = session.createReceiver(getQueueName(), "serialNo=2");
AmqpReceiver receiver = session.createReceiver(getQueueName(), "\"m.x-opt-serialNo\"=2");
receiver.flow(2);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Symbol;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
/**
* This is testing a double transfer (copy).
* First messages will expire, then DLQ.
* This will validate the data added to the queues.
*/
public class DLQAfterExpiredMessageTest extends AmqpClientTestSupport {
private static final Logger log = Logger.getLogger(DLQAfterExpiredMessageTest.class);
protected String getExpiryQueue() {
return "ActiveMQ.Expiry";
}
@Override
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
// Default Queue
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST));
// Default DLQ
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getDeadLetterAddress()), RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(getDeadLetterAddress()).setRoutingType(RoutingType.ANYCAST));
// Expiry
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getExpiryQueue()), RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(getExpiryQueue()).setRoutingType(RoutingType.ANYCAST));
// Default Topic
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST));
server.createQueue(new QueueConfiguration(getTopicName()));
// Additional Test Queues
for (int i = 0; i < getPrecreatedQueueSize(); ++i) {
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName(i)), RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(getQueueName(i)).setRoutingType(RoutingType.ANYCAST));
}
}
@Override
protected void configureAddressPolicy(ActiveMQServer server) {
// Address configuration
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setAutoCreateQueues(isAutoCreateQueues());
addressSettings.setAutoCreateAddresses(isAutoCreateAddresses());
addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
addressSettings.setExpiryAddress(SimpleString.toSimpleString(getExpiryQueue()));
addressSettings.setMaxDeliveryAttempts(1);
server.getConfiguration().getAddressesSettings().put("#", addressSettings);
server.getConfiguration().getAddressesSettings().put(getExpiryQueue(), addressSettings);
}
@Test
public void testDoubleTransfer() throws Throwable {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setTimeToLive(1);
message.setText("Test-Message");
message.setDurable(true);
message.setApplicationProperty("key1", "Value1");
sender.send(message);
sender.close();
Wait.assertEquals(1, queueView::getMessagesExpired);
Wait.assertEquals(0, queueView::getConsumerCount);
final Queue expiryView = getProxyToQueue(getExpiryQueue());
assertNotNull(expiryView);
Wait.assertEquals(1, expiryView::getMessageCount);
HashMap<String, Object> annotations = new HashMap<>();
AmqpReceiver receiverDLQ = session.createReceiver(getExpiryQueue(), "\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION + "\"='" + getQueueName() + "'");
receiverDLQ.flow(1);
AmqpMessage received = receiverDLQ.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(received);
Map<Symbol, Object> avAnnotations = received.getWrappedMessage().getMessageAnnotations().getValue();
avAnnotations.forEach((key, value) -> {
annotations.put(key.toString(), value);
});
received.reject();
receiverDLQ.close();
// Redo the selection
receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION + "\"='" + getQueueName() + "'");
receiverDLQ.flow(1);
received = receiverDLQ.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(received);
received.accept();
/** When moving to DLQ, the original headers shoudln't be touched. */
for (Map.Entry<String, Object> entry : annotations.entrySet()) {
log.debug("Checking " + entry.getKey() + " = " + entry.getValue());
Assert.assertEquals(entry.getKey() + " should be = " + entry.getValue(), entry.getValue(), received.getMessageAnnotation(entry.getKey()));
}
assertEquals(0, received.getTimeToLive());
assertNotNull(received);
assertEquals("Value1", received.getApplicationProperty("key1"));
} catch (Throwable e) {
e.printStackTrace();
throw e;
} finally {
connection.close();
}
}
}