diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java index 81755ae154..3f1baf1dce 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java @@ -148,6 +148,11 @@ public class TypedProperties { otherProps.forEachInternal(this::doPutValue); } + public TypedProperties putProperty(final SimpleString key, final Object value) { + setObjectProperty(key, value, this); + return this; + } + public Object getProperty(final SimpleString key) { return doGetProperty(key); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 568cdda65b..b8f630394e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -463,24 +463,24 @@ public interface Message { } default void referenceOriginalMessage(final Message original, String originalQueue) { - String queueOnMessage = original.getAnnotationString(Message.HDR_ORIGINAL_QUEUE); + Object queueOnMessage = original.getBrokerProperty(Message.HDR_ORIGINAL_QUEUE); if (queueOnMessage != null) { - setAnnotation(Message.HDR_ORIGINAL_QUEUE, queueOnMessage); + setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, queueOnMessage); } else if (originalQueue != null) { - setAnnotation(Message.HDR_ORIGINAL_QUEUE, originalQueue); + setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue); } - Object originalID = original.getAnnotation(Message.HDR_ORIG_MESSAGE_ID); + Object originalID = original.getBrokerProperty(Message.HDR_ORIG_MESSAGE_ID); if (originalID != null) { - setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAnnotationString(Message.HDR_ORIGINAL_ADDRESS)); + setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getBrokerProperty(Message.HDR_ORIGINAL_ADDRESS)); - setAnnotation(Message.HDR_ORIG_MESSAGE_ID, originalID); + setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, originalID); } else { - setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAddress()); + setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress()); - setAnnotation(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID()); + setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID()); } // reset expiry @@ -641,6 +641,17 @@ public interface Message { return this; } + /** To be called by the broker on ocasions such as DLQ and expiry. + * When the broker is adding additional properties. */ + default Message setBrokerProperty(SimpleString key, Object value) { + putObjectProperty(key, value); + return this; + } + + default Object getBrokerProperty(SimpleString key) { + return getObjectProperty(key); + } + Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException; Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index e0cd94b17e..6e903a3ade 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -104,6 +104,8 @@ import org.jboss.logging.Logger; */ public abstract class AMQPMessage extends RefCountMessage implements org.apache.activemq.artemis.api.core.Message { + private static final SimpleString ANNOTATION_AREA_PREFIX = SimpleString.toSimpleString("m."); + protected static final Logger logger = Logger.getLogger(AMQPMessage.class); public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD"); @@ -275,14 +277,18 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. @Override public Object getObjectPropertyForFilter(SimpleString key) { - Object value = getObjectProperty(key); - if (value == null) { - value = getMessageAnnotation(key.toString()); - } - if (value == null) { - value = getExtraBytesProperty(key); + if (key.startsWith(ANNOTATION_AREA_PREFIX)) { + key = key.subSeq(ANNOTATION_AREA_PREFIX.length(), key.length()); + return getAnnotation(key); } + Object value = getObjectProperty(key); + if (value == null) { + TypedProperties extra = getExtraProperties(); + if (extra != null) { + value = extra.getProperty(key); + } + } return value; } @@ -498,6 +504,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. } protected void setMessageAnnotation(Symbol annotation, Object value) { + if (value instanceof SimpleString) { + value = value.toString(); + } getMessageAnnotationsMap(true).put(annotation, value); } @@ -1278,6 +1287,25 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. return this; } + + @Override + public org.apache.activemq.artemis.api.core.Message setBrokerProperty(SimpleString key, Object value) { + // Annotation names have to start with x-opt + setMessageAnnotation(AMQPMessageSupport.toAnnotationName(key.toString()), value); + createExtraProperties().putProperty(key, value); + return this; + } + + @Override + public Object getBrokerProperty(SimpleString key) { + TypedProperties extra = getExtraProperties(); + if (extra == null) { + return null; + } + return extra.getProperty(key); + } + + // JMS Style property access methods. These can result in additional decode of AMQP message // data from Application properties. Updates to application properties puts the message in a // dirty state and requires a re-encode of the data to update all buffer state data otherwise diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java index dd81c4c34c..abc6392490 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java @@ -35,6 +35,7 @@ import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.Topic; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; @@ -63,6 +64,9 @@ public final class AMQPMessageSupport { private static final Logger logger = Logger.getLogger(AMQPMessageSupport.class); + + public static SimpleString HDR_ORIGINAL_ADDRESS_ANNOTATION = SimpleString.toSimpleString("x-opt-ORIG-ADDRESS"); + public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to"; // Message Properties used to map AMQP to JMS and back @@ -178,6 +182,9 @@ public final class AMQPMessageSupport { public static final Binary EMPTY_BINARY = new Binary(new byte[0]); public static final Data EMPTY_BODY = new Data(EMPTY_BINARY); + public static final String X_OPT_PREFIX = "x-opt-"; + public static final String AMQ_PROPERTY_PREFIX = "_AMQ_"; + public static final short AMQP_UNKNOWN = 0; public static final short AMQP_NULL = 1; public static final short AMQP_DATA = 2; @@ -285,6 +292,18 @@ public final class AMQPMessageSupport { } } + public static String toAnnotationName(String key) { + if (!key.startsWith(X_OPT_PREFIX.toString())) { + if (key.startsWith(AMQ_PROPERTY_PREFIX)) { + return X_OPT_PREFIX.concat(key.substring(AMQ_PROPERTY_PREFIX.length()).replace('_', '-')); + } + + return key; + } + return key; + } + + public static String toAddress(Destination destination) { try { if (destination instanceof ActiveMQDestination) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/AnnotationNameConveterTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/AnnotationNameConveterTest.java new file mode 100644 index 0000000000..2de0ed315d --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/AnnotationNameConveterTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.converter; + +import org.apache.activemq.artemis.api.core.Message; +import org.junit.Assert; +import org.junit.Test; + +public class AnnotationNameConveterTest { + + @Test + public void testAnnotationName() { + try { + Assert.assertEquals("x-opt-ORIG-QUEUE", AMQPMessageSupport.toAnnotationName(Message.HDR_ORIGINAL_QUEUE.toString())); + Assert.assertEquals("x-opt-ORIG-MESSAGE-ID", AMQPMessageSupport.toAnnotationName(Message.HDR_ORIG_MESSAGE_ID.toString())); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 82e88707f0..0caf7ad657 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3384,7 +3384,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { copy.setExpiration(0); if (expiry) { - copy.setAnnotation(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis()); + copy.setBrokerProperty(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis()); } copy.reencode(); diff --git a/docs/user-manual/en/amqp.md b/docs/user-manual/en/amqp.md index 5cff656598..f88594c0d4 100644 --- a/docs/user-manual/en/amqp.md +++ b/docs/user-manual/en/amqp.md @@ -128,6 +128,45 @@ message for later delivery: If both annotations are present in the same message then the broker will prefer the more specific `x-opt-delivery-time` value. +## DLQ and Expiry transfer + +AMQP Messages will be copied before transferred to a DLQ or ExpiryQueue and will receive properties and annotations during this process. + +The broker also keeps an internal only property (called extra property) that is not exposed to the clients, and those will also be filled during this process. + +Here is a list of Annotations and Property names AMQP Messages will receive when transferred: + +|Annotation name| Internal Property Name|Description| +|---------------|-----------------------|-----------| +|x-opt-ORIG-MESSAGE-ID|_AMQ_ORIG_MESSAGE_ID|The original message ID before the transfer| +|x-opt-ACTUAL-EXPIRY|_AMQ_ACTUAL_EXPIRY|When the expiry took place. Milliseconds since epoch times| +|x-opt-ORIG-QUEUE|_AMQ_ORIG_QUEUE|The original queue name before the transfer| +|x-opt-ORIG-ADDRESS|_AMQ_ORIG_ADDRESS|The original address name before the transfer| + +## Filtering on Message Annotations + +It is possible to filter on messaging annotations if you use the prefix "m." before the annotation name. + +For example if you want to filter messages sent to a specific destination, you could create your filter accordingly to this: + +```java +ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:5672"); +Connection connection = factory.createConnection(); +Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); +connection.start(); +javax.jms.Queue queue = session.createQueue("my-DLQ"); +MessageConsumer consumer = session.createConsumer(queue, "\"m.x-opt-ORIG-ADDRESS\"='ORIGINAL_PLACE'"); +Message message = consumer.receive(); +``` + +The broker will set internal properties. If you intend to filter after DLQ or Expiry you may choose the internal property names: + +```java +// Replace the consumer creation on the previous example: +MessageConsumer consumer = session.createConsumer(queue, "_AMQ_ORIG_ADDRESS='ORIGINAL_PLACE'"); +``` + + ## Configuring AMQP Idle Timeout It is possible to configure the AMQP Server's IDLE Timeout by setting the property amqpIdleTimeout in milliseconds on the acceptor. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java index 121137f431..30b3f7a408 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java @@ -16,10 +16,18 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -168,10 +176,20 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { connection = addConnection(client.connect()); session = connection.createSession(); - AmqpReceiver receiverDLQ = session.createReceiver(getDeadLetterAddress(), "_AMQ_ORIG_ADDRESS='" + getQueueName() + "'"); + AmqpReceiver receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m.x-opt-ORIG-ADDRESS\"='" + getQueueName() + "'"); receiverDLQ.flow(1); received = receiverDLQ.receive(5, TimeUnit.SECONDS); Assert.assertNotNull(received); + Assert.assertEquals(getQueueName(), received.getMessageAnnotation("x-opt-ORIG-ADDRESS")); + // close without accepting on purpose, it will issue a redelivery on the second filter + receiverDLQ.close(); + + // Redo the selection, however now using the extra-properties, since the broker will store these as extra properties on AMQP Messages + receiverDLQ = session.createReceiver(getDeadLetterAddress(), "_AMQ_ORIG_ADDRESS='" + getQueueName() + "'"); + receiverDLQ.flow(1); + received = receiverDLQ.receive(5, TimeUnit.SECONDS); + Assert.assertEquals(getQueueName(), received.getMessageAnnotation("x-opt-ORIG-ADDRESS")); + Assert.assertNotNull(received); received.accept(); assertNotNull("Should have read message from DLQ", received); @@ -182,6 +200,44 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { connection.close(); } + /** This test is validating a broker feature where the message copy through the DLQ will receive an annotation. + * It is also testing filter on that annotation. */ + @Test(timeout = 60000) + public void testExpiryQpidJMS() throws Exception { + ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", getBrokerAmqpConnectionURI().toString()); + Connection connection = factory.createConnection(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + MessageProducer sender = session.createProducer(queue); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + + sender.setTimeToLive(1); + TextMessage message = session.createTextMessage("Test-Message"); + message.setStringProperty("key1", "Value1"); + sender.send(message); + sender.close(); + + Wait.assertEquals(1, queueView::getMessagesExpired); + final Queue dlqView = getProxyToQueue(getDeadLetterAddress()); + assertNotNull(dlqView); + Wait.assertEquals(1, dlqView::getMessageCount); + + connection.start(); + javax.jms.Queue queueDLQ = session.createQueue(getDeadLetterAddress()); + MessageConsumer receiverDLQ = session.createConsumer(queueDLQ, "\"m.x-opt-ORIG-ADDRESS\"='" + getQueueName() + "'"); + Message received = receiverDLQ.receive(5000); + Assert.assertNotNull(received); + receiverDLQ.close(); + } finally { + connection.close(); + } + + } + @Test(timeout = 60000) public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception { AmqpClient client = createAmqpClient(); @@ -261,14 +317,12 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { AmqpMessage message = new AmqpMessage(); message.setAbsoluteExpiryTime(0); // AET should override any TTL set - message.setTimeToLive(1000); + message.setTimeToLive(100); message.setText("Test-Message"); sender.send(message); sender.close(); - Wait.assertEquals(1, queueView::getMessageCount); - - Thread.sleep(1000); + Wait.assertEquals(1L, queueView::getMessagesExpired, 10000, 10); // Now try and get the message AmqpReceiver receiver = session.createReceiver(getQueueName()); @@ -426,7 +480,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { message = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(message); - assertEquals(getQueueName(), message.getMessageAnnotation(org.apache.activemq.artemis.api.core.Message.HDR_ORIGINAL_ADDRESS.toString())); + assertEquals(getQueueName(), message.getMessageAnnotation("x-opt-ORIG-QUEUE")); assertNull(message.getDeliveryAnnotation("shouldDisappear")); assertNull(receiver.receiveNoWait()); } finally { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 85c304c02f..b75a293f23 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -396,19 +396,19 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpMessage message = new AmqpMessage(); message.setMessageId("msg" + 1); - message.setMessageAnnotation("serialNo", 1); + message.setMessageAnnotation("x-opt-serialNo", 1); message.setText("Test-Message"); sender.send(message); message = new AmqpMessage(); message.setMessageId("msg" + 2); - message.setMessageAnnotation("serialNo", 2); + message.setMessageAnnotation("x-opt-serialNo", 2); message.setText("Test-Message 2"); sender.send(message); sender.close(); LOG.debug("Attempting to read message with receiver"); - AmqpReceiver receiver = session.createReceiver(getQueueName(), "serialNo=2"); + AmqpReceiver receiver = session.createReceiver(getQueueName(), "\"m.x-opt-serialNo\"=2"); receiver.flow(2); AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); assertNotNull("Should have read message", received); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java new file mode 100644 index 0000000000..ce8fd56e63 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DLQAfterExpiredMessageTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.Test; + +/** + * This is testing a double transfer (copy). + * First messages will expire, then DLQ. + * This will validate the data added to the queues. + */ +public class DLQAfterExpiredMessageTest extends AmqpClientTestSupport { + private static final Logger log = Logger.getLogger(DLQAfterExpiredMessageTest.class); + + protected String getExpiryQueue() { + return "ActiveMQ.Expiry"; + } + + @Override + protected void createAddressAndQueues(ActiveMQServer server) throws Exception { + // Default Queue + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST)); + + // Default DLQ + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getDeadLetterAddress()), RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(getDeadLetterAddress()).setRoutingType(RoutingType.ANYCAST)); + + // Expiry + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getExpiryQueue()), RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(getExpiryQueue()).setRoutingType(RoutingType.ANYCAST)); + + // Default Topic + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST)); + server.createQueue(new QueueConfiguration(getTopicName())); + + // Additional Test Queues + for (int i = 0; i < getPrecreatedQueueSize(); ++i) { + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName(i)), RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(getQueueName(i)).setRoutingType(RoutingType.ANYCAST)); + } + } + + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + // Address configuration + AddressSettings addressSettings = new AddressSettings(); + + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + addressSettings.setAutoCreateQueues(isAutoCreateQueues()); + addressSettings.setAutoCreateAddresses(isAutoCreateAddresses()); + addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress())); + addressSettings.setExpiryAddress(SimpleString.toSimpleString(getExpiryQueue())); + addressSettings.setMaxDeliveryAttempts(1); + server.getConfiguration().getAddressesSettings().put("#", addressSettings); + server.getConfiguration().getAddressesSettings().put(getExpiryQueue(), addressSettings); + } + + @Test + public void testDoubleTransfer() throws Throwable { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setTimeToLive(1); + message.setText("Test-Message"); + message.setDurable(true); + message.setApplicationProperty("key1", "Value1"); + sender.send(message); + sender.close(); + + Wait.assertEquals(1, queueView::getMessagesExpired); + Wait.assertEquals(0, queueView::getConsumerCount); + + final Queue expiryView = getProxyToQueue(getExpiryQueue()); + assertNotNull(expiryView); + Wait.assertEquals(1, expiryView::getMessageCount); + + HashMap annotations = new HashMap<>(); + + AmqpReceiver receiverDLQ = session.createReceiver(getExpiryQueue(), "\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION + "\"='" + getQueueName() + "'"); + receiverDLQ.flow(1); + AmqpMessage received = receiverDLQ.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(received); + Map avAnnotations = received.getWrappedMessage().getMessageAnnotations().getValue(); + avAnnotations.forEach((key, value) -> { + annotations.put(key.toString(), value); + }); + received.reject(); + receiverDLQ.close(); + + + // Redo the selection + receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION + "\"='" + getQueueName() + "'"); + receiverDLQ.flow(1); + received = receiverDLQ.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(received); + received.accept(); + + /** When moving to DLQ, the original headers shoudln't be touched. */ + for (Map.Entry entry : annotations.entrySet()) { + log.debug("Checking " + entry.getKey() + " = " + entry.getValue()); + Assert.assertEquals(entry.getKey() + " should be = " + entry.getValue(), entry.getValue(), received.getMessageAnnotation(entry.getKey())); + } + + assertEquals(0, received.getTimeToLive()); + assertNotNull(received); + assertEquals("Value1", received.getApplicationProperty("key1")); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } finally { + connection.close(); + } + } +}