AMQ-7397 Add preserveDeliveryMode to deadLetterStrategy.

This commit is contained in:
Patrick Deasy 2024-11-15 10:15:40 -08:00
parent 3400983a22
commit 1d686f001a
3 changed files with 69 additions and 28 deletions

View File

@ -81,7 +81,6 @@ import org.slf4j.LoggerFactory;
* Routes Broker operations to the correct messaging regions for processing.
*/
public class RegionBroker extends EmptyBroker {
public static final String ORIGINAL_EXPIRATION = "originalExpiration";
private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
@ -749,16 +748,6 @@ public class RegionBroker extends EmptyBroker {
return messageReference.canProcessAsExpired();
}
private boolean stampAsExpired(Message message) throws IOException {
boolean stamped = false;
if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
long expiration = message.getExpiration();
message.setProperty(ORIGINAL_EXPIRATION, expiration);
stamped = true;
}
return stamped;
}
@Override
public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
LOG.debug("Message expired {}", node);
@ -781,23 +770,8 @@ public class RegionBroker extends EmptyBroker {
return false;
}
// message may be inflight to other subscriptions so do not modify
message = message.copy();
long dlqExpiration = deadLetterStrategy.getExpiration();
if (dlqExpiration > 0) {
dlqExpiration += System.currentTimeMillis();
} else {
stampAsExpired(message);
}
message.setExpiration(dlqExpiration);
if (!message.isPersistent()) {
message.setPersistent(true);
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
}
if (poisonCause != null) {
message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
poisonCause.toString());
}
message = deadLetterStrategy.prepareMessageForDeadLetterQueue(message, poisonCause);
// The original destination and transaction id do
// not get filled when the message is first sent,
// it is only populated if the message is routed to

View File

@ -17,19 +17,24 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* A strategy for choosing which destination is used for dead letter queue
* messages.
*
*/
public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
public static final String ORIGINAL_EXPIRATION = "originalExpiration";
private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class);
private boolean processNonPersistent = false;
private boolean processExpired = true;
private boolean preserveDeliveryMode = false;
private boolean enableAudit = true;
private long expiration;
@ -59,6 +64,39 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
return result;
}
@Override
public Message prepareMessageForDeadLetterQueue(Message message, Throwable poisonCause) throws IOException {
// message may be inflight to other subscriptions so do not modify
message = message.copy();
long dlqExpiration = expiration;
if (dlqExpiration > 0) {
dlqExpiration += System.currentTimeMillis();
} else {
stampAsExpired(message);
}
message.setExpiration(dlqExpiration);
if (!message.isPersistent() && !preserveDeliveryMode) {
message.setPersistent(true);
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
}
if (poisonCause != null) {
message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
poisonCause.toString());
}
return message;
}
private boolean stampAsExpired(Message message) throws IOException {
boolean stamped = false;
if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
long expiration = message.getExpiration();
message.setProperty(ORIGINAL_EXPIRATION, expiration);
stamped = true;
}
return stamped;
}
/**
* @return the processExpired
*/
@ -91,6 +129,14 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
this.processNonPersistent = processNonPersistent;
}
@Override
public boolean isPreserveDeliveryMode() { return this.preserveDeliveryMode; }
@Override
public void setPreserveDeliveryMode(boolean preserveDeliveryMode) {
this.preserveDeliveryMode = preserveDeliveryMode;
}
public boolean isEnableAudit() {
return enableAudit;
}

View File

@ -20,6 +20,8 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import java.io.IOException;
/**
* A strategy for choosing which destination is used for dead letter queue messages.
*
@ -35,6 +37,15 @@ public interface DeadLetterStrategy {
*/
boolean isSendToDeadLetterQueue(Message message);
/**
* Allow pluggable strategy for preparing a message for a dead letter queue
* for example, you might not want update the deliveryMode of messages
* @param message
* @param poisonCause
* @return prepared message to be sent to a dead letter queue
*/
Message prepareMessageForDeadLetterQueue(Message message, Throwable poisonCause) throws IOException;
/**
* Returns the dead letter queue for the given message and subscription.
*/
@ -60,6 +71,16 @@ public interface DeadLetterStrategy {
*/
public void setProcessNonPersistent(boolean processNonPersistent);
/**
* @return the PreserveDeliveryMode
*/
public boolean isPreserveDeliveryMode();
/**
* @param PreserveDeliveryMode the PreserveDeliveryMode to set
*/
public void setPreserveDeliveryMode(boolean PreserveDeliveryMode);
/**
* Allows for a Message that was already processed by a DLQ to be rolled back in case
* of a move or a retry of that message, otherwise the Message would be considered a