From 1d686f001a6c32c862ba84d06f75ff33c207ebe5 Mon Sep 17 00:00:00 2001 From: Patrick Deasy Date: Fri, 15 Nov 2024 10:15:40 -0800 Subject: [PATCH] AMQ-7397 Add preserveDeliveryMode to deadLetterStrategy. --- .../activemq/broker/region/RegionBroker.java | 30 +----------- .../policy/AbstractDeadLetterStrategy.java | 46 +++++++++++++++++++ .../region/policy/DeadLetterStrategy.java | 21 +++++++++ 3 files changed, 69 insertions(+), 28 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 844fa12029..31d4091c5f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -81,7 +81,6 @@ import org.slf4j.LoggerFactory; * Routes Broker operations to the correct messaging regions for processing. */ public class RegionBroker extends EmptyBroker { - public static final String ORIGINAL_EXPIRATION = "originalExpiration"; private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class); private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); @@ -749,16 +748,6 @@ public class RegionBroker extends EmptyBroker { return messageReference.canProcessAsExpired(); } - private boolean stampAsExpired(Message message) throws IOException { - boolean stamped = false; - if (message.getProperty(ORIGINAL_EXPIRATION) == null) { - long expiration = message.getExpiration(); - message.setProperty(ORIGINAL_EXPIRATION, expiration); - stamped = true; - } - return stamped; - } - @Override public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) { LOG.debug("Message expired {}", node); @@ -781,23 +770,8 @@ public class RegionBroker extends EmptyBroker { return false; } - // message may be inflight to other subscriptions so do not modify - message = message.copy(); - long dlqExpiration = deadLetterStrategy.getExpiration(); - if (dlqExpiration > 0) { - dlqExpiration += System.currentTimeMillis(); - } else { - stampAsExpired(message); - } - message.setExpiration(dlqExpiration); - if (!message.isPersistent()) { - message.setPersistent(true); - message.setProperty("originalDeliveryMode", "NON_PERSISTENT"); - } - if (poisonCause != null) { - message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, - poisonCause.toString()); - } + message = deadLetterStrategy.prepareMessageForDeadLetterQueue(message, poisonCause); + // The original destination and transaction id do // not get filled when the message is first sent, // it is only populated if the message is routed to diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index 2421206810..9d416696a2 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -17,19 +17,24 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.ActiveMQMessageAudit; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + /** * A strategy for choosing which destination is used for dead letter queue * messages. * */ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { + public static final String ORIGINAL_EXPIRATION = "originalExpiration"; private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class); private boolean processNonPersistent = false; private boolean processExpired = true; + private boolean preserveDeliveryMode = false; private boolean enableAudit = true; private long expiration; @@ -59,6 +64,39 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { return result; } + @Override + public Message prepareMessageForDeadLetterQueue(Message message, Throwable poisonCause) throws IOException { + // message may be inflight to other subscriptions so do not modify + message = message.copy(); + long dlqExpiration = expiration; + if (dlqExpiration > 0) { + dlqExpiration += System.currentTimeMillis(); + } else { + stampAsExpired(message); + } + message.setExpiration(dlqExpiration); + if (!message.isPersistent() && !preserveDeliveryMode) { + message.setPersistent(true); + message.setProperty("originalDeliveryMode", "NON_PERSISTENT"); + } + if (poisonCause != null) { + message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, + poisonCause.toString()); + } + + return message; + } + + private boolean stampAsExpired(Message message) throws IOException { + boolean stamped = false; + if (message.getProperty(ORIGINAL_EXPIRATION) == null) { + long expiration = message.getExpiration(); + message.setProperty(ORIGINAL_EXPIRATION, expiration); + stamped = true; + } + return stamped; + } + /** * @return the processExpired */ @@ -91,6 +129,14 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { this.processNonPersistent = processNonPersistent; } + @Override + public boolean isPreserveDeliveryMode() { return this.preserveDeliveryMode; } + + @Override + public void setPreserveDeliveryMode(boolean preserveDeliveryMode) { + this.preserveDeliveryMode = preserveDeliveryMode; + } + public boolean isEnableAudit() { return enableAudit; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java index fdd11740db..0fa03d2da5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java @@ -20,6 +20,8 @@ import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; +import java.io.IOException; + /** * A strategy for choosing which destination is used for dead letter queue messages. * @@ -35,6 +37,15 @@ public interface DeadLetterStrategy { */ boolean isSendToDeadLetterQueue(Message message); + /** + * Allow pluggable strategy for preparing a message for a dead letter queue + * for example, you might not want update the deliveryMode of messages + * @param message + * @param poisonCause + * @return prepared message to be sent to a dead letter queue + */ + Message prepareMessageForDeadLetterQueue(Message message, Throwable poisonCause) throws IOException; + /** * Returns the dead letter queue for the given message and subscription. */ @@ -60,6 +71,16 @@ public interface DeadLetterStrategy { */ public void setProcessNonPersistent(boolean processNonPersistent); + /** + * @return the PreserveDeliveryMode + */ + public boolean isPreserveDeliveryMode(); + + /** + * @param PreserveDeliveryMode the PreserveDeliveryMode to set + */ + public void setPreserveDeliveryMode(boolean PreserveDeliveryMode); + /** * Allows for a Message that was already processed by a DLQ to be rolled back in case * of a move or a retry of that message, otherwise the Message would be considered a