diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index b5213e9775..5b53be8012 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -161,6 +161,11 @@ public interface Message {
*/
SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
+ /**
+ * The time at which the message arrived at the broker.
+ */
+ SimpleString HDR_INGRESS_TIMESTAMP = new SimpleString("_AMQ_INGRESS_TIMESTAMP");
+
/**
* The prefix used (if any) when sending this message. For protocols (e.g. STOMP) that need to track this and restore
* the prefix when the message is consumed.
@@ -643,6 +648,15 @@ public interface Message {
return getObjectProperty(key);
}
+ default Message setIngressTimestamp() {
+ setBrokerProperty(HDR_INGRESS_TIMESTAMP, System.currentTimeMillis());
+ return this;
+ }
+
+ default Long getIngressTimestamp() {
+ return (Long) getBrokerProperty(HDR_INGRESS_TIMESTAMP);
+ }
+
Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 771e745402..526ad543d6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -69,6 +69,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
reader.readInto(wrapbuffer);
AMQPStandardMessage standardMessage = new AMQPStandardMessage(messageFormat, buffer, extraProperties, coreMessageObjectPools);
+ standardMessage.setMessageAnnotations(messageAnnotations);
standardMessage.setMessageID(messageID);
return standardMessage.toCore();
} catch (Exception e) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index f32c285b35..8606429339 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -262,6 +262,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
return applicationProperties;
}
+ protected MessageAnnotations getDecodedMessageAnnotations() {
+ return messageAnnotations;
+ }
+
protected abstract ReadableBuffer getData();
// Access to the AMQP message data using safe copies freshly decoded from the current
@@ -586,6 +590,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
getMessageAnnotationsMap(true).put(annotation, value);
}
+ protected void setMessageAnnotations(MessageAnnotations messageAnnotations) {
+ this.messageAnnotations = messageAnnotations;
+ }
+
// Message decoding and copying methods. Care must be taken here to ensure the buffer and the
// state tracking information is kept up to data. When the message is manually changed a forced
// re-encode should be done to update the backing data with the in memory elements.
@@ -1351,6 +1359,17 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
return extra.getProperty(key);
}
+ @Override
+ public final org.apache.activemq.artemis.api.core.Message setIngressTimestamp() {
+ setMessageAnnotation(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION, System.currentTimeMillis());
+ return this;
+ }
+
+ @Override
+ public Long getIngressTimestamp() {
+ return (Long) getMessageAnnotation(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION);
+ }
+
// JMS Style property access methods. These can result in additional decode of AMQP message
// data from Application properties. Updates to application properties puts the message in a
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java
index a2cd44d90c..bab9ca9b7b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java
@@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
/** Warning: do not use this class outside of the broker implementation.
@@ -56,4 +57,8 @@ public class AMQPMessageBrokerAccessor {
return message.getCurrentProperties();
}
+ public static MessageAnnotations getDecodedMessageAnnotations(AMQPMessage message) {
+ return message.getDecodedMessageAnnotations();
+ }
+
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index 918d987ce3..c8e1eba1aa 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -183,6 +183,8 @@ public final class AMQPMessageSupport {
public static final String X_OPT_PREFIX = "x-opt-";
public static final String AMQ_PROPERTY_PREFIX = "_AMQ_";
+ public static final String X_OPT_INGRESS_TIME = X_OPT_PREFIX + "ingress-time";
+
public static final short AMQP_UNKNOWN = 0;
public static final short AMQP_NULL = 1;
public static final short AMQP_DATA = 2;
@@ -195,6 +197,7 @@ public final class AMQPMessageSupport {
public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-dest");
public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-reply-to");
+ public static final Symbol INGRESS_TIME_MSG_ANNOTATION = getSymbol(X_OPT_INGRESS_TIME);
public static final byte QUEUE_TYPE = 0x00;
public static final byte TOPIC_TYPE = 0x01;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 8071dbedc4..3f14b308ec 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.converter;
+import static org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP;
import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
@@ -292,6 +293,8 @@ public class AmqpCoreConverter {
if (delay > 0) {
jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay);
}
+ } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) {
+ jms.setLongProperty(HDR_INGRESS_TIMESTAMP.toString(), ((Number) entry.getValue()).longValue());
}
try {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index ee7659e989..bd27f8cc21 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -295,6 +295,9 @@ public class CoreAmqpConverter {
} else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
// skip..remove annotation from previous inbound transformation
continue;
+ } else if (key.equals(Message.HDR_INGRESS_TIMESTAMP.toString())) {
+ maMap.put(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION, message.getLongProperty(key));
+ continue;
}
if (apMap == null) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index c65d86d0e0..b383405c2a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -70,6 +70,7 @@ import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Properties;
@@ -701,7 +702,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
try {
int proposedPosition = writeHeaderAndAnnotations(context, deliveryAnnotationsToEncode);
if (message.isReencoded()) {
- proposedPosition = writePropertiesAndApplicationProperties(context, message);
+ proposedPosition = writeMessageAnnotationsPropertiesAndApplicationProperties(context, message);
}
context.position(proposedPosition);
@@ -716,14 +717,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
/**
* Write properties and application properties when the message is flagged as re-encoded.
*/
- private int writePropertiesAndApplicationProperties(LargeBodyReader context, AMQPLargeMessage message) throws Exception {
+ private int writeMessageAnnotationsPropertiesAndApplicationProperties(LargeBodyReader context, AMQPLargeMessage message) throws Exception {
int bodyPosition = AMQPMessageBrokerAccessor.getRemainingBodyPosition(message);
assert bodyPosition > 0;
- writePropertiesAndApplicationPropertiesInternal(message);
+ writeMessageAnnotationsPropertiesAndApplicationPropertiesInternal(message);
return bodyPosition;
}
- private void writePropertiesAndApplicationPropertiesInternal(AMQPLargeMessage message) {
+ private void writeMessageAnnotationsPropertiesAndApplicationPropertiesInternal(AMQPLargeMessage message) {
+ MessageAnnotations messageAnnotations = AMQPMessageBrokerAccessor.getDecodedMessageAnnotations(message);
+
+ if (messageAnnotations != null) {
+ TLSEncode.getEncoder().writeObject(messageAnnotations);
+ }
+
Properties amqpProperties = AMQPMessageBrokerAccessor.getCurrentProperties(message);
if (amqpProperties != null) {
TLSEncode.getEncoder().writeObject(amqpProperties);
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 136f7b936c..337cefdc01 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -686,6 +686,11 @@ public final class OpenWireMessageConverter {
setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
}
+ final Long ingressTimestamp = coreMessage.getPropertyNames().contains(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) ? coreMessage.getLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) : null;
+ if (ingressTimestamp != null) {
+ setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp);
+ }
+
final Set props = coreMessage.getPropertyNames();
if (props != null) {
setAMQMsgObjectProperties(amqMsg, coreMessage, props);
@@ -937,6 +942,15 @@ public final class OpenWireMessageConverter {
}
}
+ private static void setAMQMsgHdrIngressTimestamp(final ActiveMQMessage amqMsg,
+ final Long ingressTimestamp) throws IOException {
+ try {
+ amqMsg.setLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP.toString(), ingressTimestamp);
+ } catch (JMSException e) {
+ throw new IOException("failure to set ingress timestamp property " + ingressTimestamp, e);
+ }
+ }
+
private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg,
final ICoreMessage coreMessage,
final Set props) throws IOException {
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
index c965302600..5ec24f3fbe 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
@@ -135,6 +135,8 @@ public interface Stomp {
String PERSISTENT = "persistent";
String VALIDATED_USER = "JMSXUserID";
+
+ String INGRESS_TIMESTAMP = "ingress-timestamp";
}
interface Subscribe {
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
index 07dcd8fd15..8829e40975 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
@@ -132,6 +132,9 @@ public class StompUtils {
if (message.containsProperty(Message.HDR_ROUTING_TYPE)) {
command.addHeader(Stomp.Headers.Send.DESTINATION_TYPE, RoutingType.getType(message.getByteProperty(Message.HDR_ROUTING_TYPE.toString())).toString());
}
+ if (message.containsProperty(Message.HDR_INGRESS_TIMESTAMP)) {
+ command.addHeader(Stomp.Headers.Message.INGRESS_TIMESTAMP, Long.toString(message.getLongProperty(Message.HDR_INGRESS_TIMESTAMP)));
+ }
// now let's add all the rest of the message headers
Set names = message.getPropertyNames();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 915c2e7c70..2d8b81fc08 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -311,6 +311,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String ENABLE_METRICS = "enable-metrics";
+ private static final String ENABLE_INGRESS_TIMESTAMP = "enable-ingress-timestamp";
// Attributes ----------------------------------------------------
@@ -1361,6 +1362,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
addressSettings.setExpiryQueueSuffix(new SimpleString(getTrimmedTextContent(child)));
} else if (ENABLE_METRICS.equalsIgnoreCase(name)) {
addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child));
+ } else if (ENABLE_INGRESS_TIMESTAMP.equalsIgnoreCase(name)) {
+ addressSettings.setEnableIngressTimestamp(XMLUtil.parseBoolean(child));
}
}
return setting;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 542a974fac..db9e2cb7d3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -2181,6 +2181,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
throw ActiveMQMessageBundle.BUNDLE.rejectEmptyValidatedUser();
}
+ if (server.getAddressSettingsRepository().getMatch(msg.getAddress()).isEnableIngressTimestamp()) {
+ msg.setIngressTimestamp();
+ msg.reencode();
+ }
+
if (tx == null || autoCommitSends) {
routingContext.setTransaction(null);
} else {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 8f26b4e1dd..79cece3e61 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -133,6 +133,8 @@ public class AddressSettings implements Mergeable, Serializable
public static final SlowConsumerThresholdMeasurementUnit DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_SECOND;
+ public static final boolean DEFAULT_ENABLE_INGRESS_TIMESTAMP = false;
+
private AddressFullMessagePolicy addressFullMessagePolicy = null;
private Long maxSizeBytes = null;
@@ -265,6 +267,8 @@ public class AddressSettings implements Mergeable, Serializable
private Integer managementMessageAttributeSizeLimit = null;
+ private Boolean enableIngressTimestamp = null;
+
//from amq5
//make it transient
private transient Integer queuePrefetch = null;
@@ -332,6 +336,7 @@ public class AddressSettings implements Mergeable, Serializable
this.enableMetrics = other.enableMetrics;
this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit;
this.slowConsumerThresholdMeasurementUnit = other.slowConsumerThresholdMeasurementUnit;
+ this.enableIngressTimestamp = other.enableIngressTimestamp;
}
public AddressSettings() {
@@ -955,6 +960,15 @@ public class AddressSettings implements Mergeable, Serializable
return this;
}
+ public boolean isEnableIngressTimestamp() {
+ return enableIngressTimestamp != null ? enableIngressTimestamp : AddressSettings.DEFAULT_ENABLE_INGRESS_TIMESTAMP;
+ }
+
+ public AddressSettings setEnableIngressTimestamp(final boolean enableIngressTimestamp) {
+ this.enableIngressTimestamp = enableIngressTimestamp;
+ return this;
+ }
+
/**
* merge 2 objects in to 1
*
@@ -1154,6 +1168,9 @@ public class AddressSettings implements Mergeable, Serializable
if (enableMetrics == null) {
enableMetrics = merged.enableMetrics;
}
+ if (enableIngressTimestamp == null) {
+ enableIngressTimestamp = merged.enableIngressTimestamp;
+ }
}
@Override
@@ -1377,6 +1394,10 @@ public class AddressSettings implements Mergeable, Serializable
slowConsumerThresholdMeasurementUnit = SlowConsumerThresholdMeasurementUnit.valueOf(slowConsumerMeasurementUnitEnumValue);
}
}
+
+ if (buffer.readableBytes() > 0) {
+ enableIngressTimestamp = BufferHelper.readNullableBoolean(buffer);
+ }
}
@Override
@@ -1442,7 +1463,8 @@ public class AddressSettings implements Mergeable, Serializable
BufferHelper.sizeOfNullableBoolean(enableMetrics) +
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) +
BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit) +
- BufferHelper.sizeOfNullableInteger(slowConsumerThresholdMeasurementUnit.getValue());
+ BufferHelper.sizeOfNullableInteger(slowConsumerThresholdMeasurementUnit.getValue()) +
+ BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp);
}
@Override
@@ -1572,6 +1594,8 @@ public class AddressSettings implements Mergeable, Serializable
BufferHelper.writeNullableInteger(buffer, managementMessageAttributeSizeLimit);
BufferHelper.writeNullableInteger(buffer, slowConsumerThresholdMeasurementUnit == null ? null : slowConsumerThresholdMeasurementUnit.getValue());
+
+ BufferHelper.writeNullableBoolean(buffer, enableIngressTimestamp);
}
/* (non-Javadoc)
@@ -1646,6 +1670,7 @@ public class AddressSettings implements Mergeable, Serializable
result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode());
result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode());
result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode());
+ result = prime * result + ((enableIngressTimestamp == null) ? 0 : enableIngressTimestamp.hashCode());
return result;
}
@@ -2006,6 +2031,12 @@ public class AddressSettings implements Mergeable, Serializable
if (slowConsumerThresholdMeasurementUnit != other.slowConsumerThresholdMeasurementUnit)
return false;
+ if (enableIngressTimestamp == null) {
+ if (other.enableIngressTimestamp != null)
+ return false;
+ } else if (!enableIngressTimestamp.equals(other.enableIngressTimestamp))
+ return false;
+
return true;
}
@@ -2141,6 +2172,8 @@ public class AddressSettings implements Mergeable, Serializable
expiryQueueSuffix +
", enableMetrics=" +
enableMetrics +
+ ", enableIngressTime=" +
+ enableIngressTimestamp +
"]";
}
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index ad612d2f24..614e056a82 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3970,6 +3970,14 @@
+
+
+
+ whether or not the broker should set its own timestamp on incoming messages to the matching address
+
+
+
+
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 7878098e72..07f11ab558 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -384,6 +384,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(3, conf.getAddressesSettings().get("a1").getDefaultRingSize());
assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount());
assertTrue(conf.getAddressesSettings().get("a1").isEnableMetrics());
+ assertTrue(conf.getAddressesSettings().get("a1").isEnableIngressTimestamp());
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources());
@@ -420,6 +421,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(-1, conf.getAddressesSettings().get("a2").getDefaultRingSize());
assertEquals(10, conf.getAddressesSettings().get("a2").getRetroactiveMessageCount());
assertFalse(conf.getAddressesSettings().get("a2").isEnableMetrics());
+ assertFalse(conf.getAddressesSettings().get("a2").isEnableIngressTimestamp());
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java
index a31e09f254..0f20b82e1a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java
@@ -101,6 +101,7 @@ public class MessagePropertyTest extends ActiveMQTestBase {
assertEquals(floatValue(i), message.getFloatProperty("float").floatValue(), 0.001);
assertEquals(new SimpleString(Integer.toString(i)), message.getSimpleStringProperty(SIMPLE_STRING_KEY.toString()));
assertEqualsByteArrays(byteArray(i), message.getBytesProperty("byte[]"));
+ assertNull(message.getIngressTimestamp());
assertTrue(message.containsProperty("null-value"));
assertEquals(message.getObjectProperty("null-value"), null);
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index a72459c5bd..e0a6a5ab99 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -473,6 +473,7 @@
ANYCAST
MULTICAST
3
+ true
a2.1
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
index 846c0e5a5b..445c1842f6 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
@@ -44,6 +44,7 @@
ANYCAST
MULTICAST
3
+ true
a2.1
diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md
index 599b871e5a..e3635afd14 100644
--- a/docs/user-manual/en/address-model.md
+++ b/docs/user-manual/en/address-model.md
@@ -691,6 +691,7 @@ that would be found in the `broker.xml` file.
-1
0
true
+ false
```
@@ -991,3 +992,12 @@ queues created on the matching address. Defaults to 0. Read more about
`enable-metrics` determines whether or not metrics will be published to any
configured metrics plugin for the matching address. Default is `true`. Read more
about [metrics](metrics.md).
+
+`enable-ingress-timestamp` determines whether or not the broker will add its time
+to messages sent to the matching address. When `true` the exact behavior will
+depend on the specific protocol in use. For AMQP messages the broker will add a
+`long` *message annotation* named `x-opt-ingress-time`. For core messages (used by
+the core and OpenWire protocols) the broker will add a long property named
+`_AMQ_INGRESS_TIMESTAMP`. For STOMP messages the broker will add a frame header
+named `ingress-timestamp`. The value will be the number of milliseconds since the
+[epoch](https://en.wikipedia.org/wiki/Unix_time). Default is `false`.
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 7d65cdb627..7e154426b5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import java.net.URI;
@@ -25,10 +31,17 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -385,6 +398,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
}
protected void sendMessages(String destinationName, int count, boolean durable) throws Exception {
+ sendMessages(destinationName, count, durable, null);
+ }
+
+ protected void sendMessages(String destinationName, int count, boolean durable, byte[] payload) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
@@ -395,6 +412,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:" + i);
message.setDurable(durable);
+ if (payload != null) {
+ message.setBytes(payload);
+ }
sender.send(message);
}
} finally {
@@ -402,6 +422,57 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
}
}
+ protected void sendMessagesCore(String destinationName, int count, boolean durable) throws Exception {
+ sendMessagesCore(destinationName, count, durable, null);
+ }
+
+ protected void sendMessagesCore(String destinationName, int count, boolean durable, byte[] body) throws Exception {
+ ServerLocator serverLocator = ActiveMQClient.createServerLocator("tcp://127.0.0.1:5672");
+ ClientSessionFactory clientSessionFactory = serverLocator.createSessionFactory();
+ ClientSession session = clientSessionFactory.createSession();
+ try {
+ ClientProducer sender = session.createProducer(destinationName);
+
+ for (int i = 0; i < count; ++i) {
+ ClientMessage message = session.createMessage(durable);
+ if (body != null) {
+ message.getBodyBuffer().writeBytes(body);
+ }
+ sender.send(message);
+ }
+ } finally {
+ session.close();
+ }
+ }
+
+ protected void sendMessagesOpenWire(String destinationName, int count, boolean durable) throws Exception {
+ sendMessagesOpenWire(destinationName, count, durable, null);
+ }
+
+ protected void sendMessagesOpenWire(String destinationName, int count, boolean durable, byte[] payload) throws Exception {
+ ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:5672");
+ Connection connection = cf.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try {
+ MessageProducer producer = session.createProducer(session.createQueue(destinationName));
+ if (durable) {
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ } else {
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ }
+
+ for (int i = 0; i < count; ++i) {
+ BytesMessage message = session.createBytesMessage();
+ if (payload != null) {
+ message.writeBytes(payload);
+ }
+ producer.send(message);
+ }
+ } finally {
+ connection.close();
+ }
+ }
+
protected Source createDynamicSource(boolean topic) {
Source source = new Source();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpIngressTimestampTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpIngressTimestampTest.java
new file mode 100644
index 0000000000..798aa32c45
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpIngressTimestampTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AmqpIngressTimestampTest extends AmqpClientTestSupport {
+
+ public int amqpMinLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ @Parameterized.Parameters(name = "restart={0}, large={1}")
+ public static Collection