[AMQ-8320] Add JMS 2.0 JMSProducer deliveryDelay support

This commit is contained in:
Matt Pavlovich 2024-02-23 09:28:08 -06:00
parent 7b9d0aefd3
commit 9bb22a3fd5
No known key found for this signature in database
3 changed files with 16 additions and 2 deletions

View File

@ -629,6 +629,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
}); });
} }
if (m.propertyExists(ActiveMQMessage.JMS_DELIVERY_TIME_PROPERTY)) {
m.setJMSDeliveryTime(m.getLongProperty(ActiveMQMessage.JMS_DELIVERY_TIME_PROPERTY));
}
return m; return m;
} }

View File

@ -35,6 +35,7 @@ import jakarta.jms.MessageFormatRuntimeException;
import jakarta.jms.ObjectMessage; import jakarta.jms.ObjectMessage;
import jakarta.jms.TextMessage; import jakarta.jms.TextMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.TypeConversionSupport; import org.apache.activemq.util.TypeConversionSupport;
@ -46,6 +47,7 @@ public class ActiveMQProducer implements JMSProducer {
// QoS override of defaults on a per-JMSProducer instance basis // QoS override of defaults on a per-JMSProducer instance basis
private String correlationId = null; private String correlationId = null;
private byte[] correlationIdBytes = null; private byte[] correlationIdBytes = null;
private Long deliveryDelay = null;
private Integer deliveryMode = null; private Integer deliveryMode = null;
private Boolean disableMessageID = false; private Boolean disableMessageID = false;
private Boolean disableMessageTimestamp = false; private Boolean disableMessageTimestamp = false;
@ -87,6 +89,13 @@ public class ActiveMQProducer implements JMSProducer {
} }
} }
// [AMQ-8320] Producer setting for deliveryDelay will override user-specified ActiveMQ Scheduled Delay property
if(this.deliveryDelay != null) {
long deliveryTimeMillis = System.currentTimeMillis() + this.deliveryDelay;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, this.deliveryDelay);
message.setLongProperty(ActiveMQMessage.JMS_DELIVERY_TIME_PROPERTY, deliveryTimeMillis);
}
activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), null); activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), null);
} catch (JMSException e) { } catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e); throw JMSExceptionSupport.convertToJMSRuntimeException(e);
@ -243,12 +252,13 @@ public class ActiveMQProducer implements JMSProducer {
@Override @Override
public JMSProducer setDeliveryDelay(long deliveryDelay) { public JMSProducer setDeliveryDelay(long deliveryDelay) {
throw new UnsupportedOperationException("setDeliveryDelay(long) is not supported"); this.deliveryDelay = deliveryDelay;
return this;
} }
@Override @Override
public long getDeliveryDelay() { public long getDeliveryDelay() {
throw new UnsupportedOperationException("getDeliveryDelay() is not supported"); return this.deliveryDelay;
} }
@Override @Override

View File

@ -48,6 +48,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE; public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE;
public static final String DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause"; public static final String DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause";
public static final String BROKER_PATH_PROPERTY = "JMSActiveMQBrokerPath"; public static final String BROKER_PATH_PROPERTY = "JMSActiveMQBrokerPath";
public static final String JMS_DELIVERY_TIME_PROPERTY = "JMSDeliveryTime";
private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>(); private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>();