From b7f9807cd97812e7827abe3703dca7a1c74fc87f Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 1 Dec 2020 11:02:25 -0600 Subject: [PATCH] ARTEMIS-2919 support timestamping incoming messages --- .../activemq/artemis/api/core/Message.java | 14 ++ .../amqp/broker/AMQPLargeMessage.java | 1 + .../protocol/amqp/broker/AMQPMessage.java | 19 ++ .../broker/AMQPMessageBrokerAccessor.java | 5 + .../amqp/converter/AMQPMessageSupport.java | 3 + .../amqp/converter/AmqpCoreConverter.java | 3 + .../amqp/converter/CoreAmqpConverter.java | 3 + .../proton/ProtonServerSenderContext.java | 15 +- .../openwire/OpenWireMessageConverter.java | 14 ++ .../artemis/core/protocol/stomp/Stomp.java | 2 + .../core/protocol/stomp/StompUtils.java | 3 + .../impl/FileConfigurationParser.java | 3 + .../core/server/impl/ServerSessionImpl.java | 5 + .../core/settings/impl/AddressSettings.java | 35 ++- .../schema/artemis-configuration.xsd | 8 + .../config/impl/FileConfigurationTest.java | 2 + .../message/impl/MessagePropertyTest.java | 1 + .../ConfigurationTest-full-config.xml | 1 + ...nTest-xinclude-config-address-settings.xml | 1 + docs/user-manual/en/address-model.md | 10 + .../amqp/AmqpClientTestSupport.java | 71 ++++++ .../amqp/AmqpIngressTimestampTest.java | 154 +++++++++++++ .../client/IngressTimestampTest.java | 207 ++++++++++++++++++ .../tests/integration/stomp/StompTest.java | 21 ++ 24 files changed, 596 insertions(+), 5 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpIngressTimestampTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/IngressTimestampTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index b5213e9775..5b53be8012 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -161,6 +161,11 @@ public interface Message { */ SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE"); + /** + * The time at which the message arrived at the broker. + */ + SimpleString HDR_INGRESS_TIMESTAMP = new SimpleString("_AMQ_INGRESS_TIMESTAMP"); + /** * The prefix used (if any) when sending this message. For protocols (e.g. STOMP) that need to track this and restore * the prefix when the message is consumed. @@ -643,6 +648,15 @@ public interface Message { return getObjectProperty(key); } + default Message setIngressTimestamp() { + setBrokerProperty(HDR_INGRESS_TIMESTAMP, System.currentTimeMillis()); + return this; + } + + default Long getIngressTimestamp() { + return (Long) getBrokerProperty(HDR_INGRESS_TIMESTAMP); + } + Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException; Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 771e745402..526ad543d6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -69,6 +69,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage reader.readInto(wrapbuffer); AMQPStandardMessage standardMessage = new AMQPStandardMessage(messageFormat, buffer, extraProperties, coreMessageObjectPools); + standardMessage.setMessageAnnotations(messageAnnotations); standardMessage.setMessageID(messageID); return standardMessage.toCore(); } catch (Exception e) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index f32c285b35..8606429339 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -262,6 +262,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. return applicationProperties; } + protected MessageAnnotations getDecodedMessageAnnotations() { + return messageAnnotations; + } + protected abstract ReadableBuffer getData(); // Access to the AMQP message data using safe copies freshly decoded from the current @@ -586,6 +590,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. getMessageAnnotationsMap(true).put(annotation, value); } + protected void setMessageAnnotations(MessageAnnotations messageAnnotations) { + this.messageAnnotations = messageAnnotations; + } + // Message decoding and copying methods. Care must be taken here to ensure the buffer and the // state tracking information is kept up to data. When the message is manually changed a forced // re-encode should be done to update the backing data with the in memory elements. @@ -1351,6 +1359,17 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. return extra.getProperty(key); } + @Override + public final org.apache.activemq.artemis.api.core.Message setIngressTimestamp() { + setMessageAnnotation(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION, System.currentTimeMillis()); + return this; + } + + @Override + public Long getIngressTimestamp() { + return (Long) getMessageAnnotation(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION); + } + // JMS Style property access methods. These can result in additional decode of AMQP message // data from Application properties. Updates to application properties puts the message in a diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java index a2cd44d90c..bab9ca9b7b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java @@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; /** Warning: do not use this class outside of the broker implementation. @@ -56,4 +57,8 @@ public class AMQPMessageBrokerAccessor { return message.getCurrentProperties(); } + public static MessageAnnotations getDecodedMessageAnnotations(AMQPMessage message) { + return message.getDecodedMessageAnnotations(); + } + } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java index 918d987ce3..c8e1eba1aa 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java @@ -183,6 +183,8 @@ public final class AMQPMessageSupport { public static final String X_OPT_PREFIX = "x-opt-"; public static final String AMQ_PROPERTY_PREFIX = "_AMQ_"; + public static final String X_OPT_INGRESS_TIME = X_OPT_PREFIX + "ingress-time"; + public static final short AMQP_UNKNOWN = 0; public static final short AMQP_NULL = 1; public static final short AMQP_DATA = 2; @@ -195,6 +197,7 @@ public final class AMQPMessageSupport { public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-dest"); public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-reply-to"); + public static final Symbol INGRESS_TIME_MSG_ANNOTATION = getSymbol(X_OPT_INGRESS_TIME); public static final byte QUEUE_TYPE = 0x00; public static final byte TOPIC_TYPE = 0x01; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index 8071dbedc4..3f14b308ec 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.converter; +import static org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP; import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL; @@ -292,6 +293,8 @@ public class AmqpCoreConverter { if (delay > 0) { jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay); } + } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) { + jms.setLongProperty(HDR_INGRESS_TIMESTAMP.toString(), ((Number) entry.getValue()).longValue()); } try { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java index ee7659e989..bd27f8cc21 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -295,6 +295,9 @@ public class CoreAmqpConverter { } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) { // skip..remove annotation from previous inbound transformation continue; + } else if (key.equals(Message.HDR_INGRESS_TIMESTAMP.toString())) { + maMap.put(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION, message.getLongProperty(key)); + continue; } if (apMap == null) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index c65d86d0e0..b383405c2a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -70,6 +70,7 @@ import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Modified; import org.apache.qpid.proton.amqp.messaging.Outcome; import org.apache.qpid.proton.amqp.messaging.Properties; @@ -701,7 +702,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr try { int proposedPosition = writeHeaderAndAnnotations(context, deliveryAnnotationsToEncode); if (message.isReencoded()) { - proposedPosition = writePropertiesAndApplicationProperties(context, message); + proposedPosition = writeMessageAnnotationsPropertiesAndApplicationProperties(context, message); } context.position(proposedPosition); @@ -716,14 +717,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr /** * Write properties and application properties when the message is flagged as re-encoded. */ - private int writePropertiesAndApplicationProperties(LargeBodyReader context, AMQPLargeMessage message) throws Exception { + private int writeMessageAnnotationsPropertiesAndApplicationProperties(LargeBodyReader context, AMQPLargeMessage message) throws Exception { int bodyPosition = AMQPMessageBrokerAccessor.getRemainingBodyPosition(message); assert bodyPosition > 0; - writePropertiesAndApplicationPropertiesInternal(message); + writeMessageAnnotationsPropertiesAndApplicationPropertiesInternal(message); return bodyPosition; } - private void writePropertiesAndApplicationPropertiesInternal(AMQPLargeMessage message) { + private void writeMessageAnnotationsPropertiesAndApplicationPropertiesInternal(AMQPLargeMessage message) { + MessageAnnotations messageAnnotations = AMQPMessageBrokerAccessor.getDecodedMessageAnnotations(message); + + if (messageAnnotations != null) { + TLSEncode.getEncoder().writeObject(messageAnnotations); + } + Properties amqpProperties = AMQPMessageBrokerAccessor.getCurrentProperties(message); if (amqpProperties != null) { TLSEncode.getEncoder().writeObject(amqpProperties); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 136f7b936c..337cefdc01 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -686,6 +686,11 @@ public final class OpenWireMessageConverter { setAMQMsgHdrLastValueName(amqMsg, lastValueProperty); } + final Long ingressTimestamp = coreMessage.getPropertyNames().contains(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) ? coreMessage.getLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) : null; + if (ingressTimestamp != null) { + setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp); + } + final Set props = coreMessage.getPropertyNames(); if (props != null) { setAMQMsgObjectProperties(amqMsg, coreMessage, props); @@ -937,6 +942,15 @@ public final class OpenWireMessageConverter { } } + private static void setAMQMsgHdrIngressTimestamp(final ActiveMQMessage amqMsg, + final Long ingressTimestamp) throws IOException { + try { + amqMsg.setLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP.toString(), ingressTimestamp); + } catch (JMSException e) { + throw new IOException("failure to set ingress timestamp property " + ingressTimestamp, e); + } + } + private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg, final ICoreMessage coreMessage, final Set props) throws IOException { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java index c965302600..5ec24f3fbe 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java @@ -135,6 +135,8 @@ public interface Stomp { String PERSISTENT = "persistent"; String VALIDATED_USER = "JMSXUserID"; + + String INGRESS_TIMESTAMP = "ingress-timestamp"; } interface Subscribe { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java index 07dcd8fd15..8829e40975 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java @@ -132,6 +132,9 @@ public class StompUtils { if (message.containsProperty(Message.HDR_ROUTING_TYPE)) { command.addHeader(Stomp.Headers.Send.DESTINATION_TYPE, RoutingType.getType(message.getByteProperty(Message.HDR_ROUTING_TYPE.toString())).toString()); } + if (message.containsProperty(Message.HDR_INGRESS_TIMESTAMP)) { + command.addHeader(Stomp.Headers.Message.INGRESS_TIMESTAMP, Long.toString(message.getLongProperty(Message.HDR_INGRESS_TIMESTAMP))); + } // now let's add all the rest of the message headers Set names = message.getPropertyNames(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 915c2e7c70..2d8b81fc08 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -311,6 +311,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String ENABLE_METRICS = "enable-metrics"; + private static final String ENABLE_INGRESS_TIMESTAMP = "enable-ingress-timestamp"; // Attributes ---------------------------------------------------- @@ -1361,6 +1362,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { addressSettings.setExpiryQueueSuffix(new SimpleString(getTrimmedTextContent(child))); } else if (ENABLE_METRICS.equalsIgnoreCase(name)) { addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child)); + } else if (ENABLE_INGRESS_TIMESTAMP.equalsIgnoreCase(name)) { + addressSettings.setEnableIngressTimestamp(XMLUtil.parseBoolean(child)); } } return setting; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 542a974fac..db9e2cb7d3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -2181,6 +2181,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener { throw ActiveMQMessageBundle.BUNDLE.rejectEmptyValidatedUser(); } + if (server.getAddressSettingsRepository().getMatch(msg.getAddress()).isEnableIngressTimestamp()) { + msg.setIngressTimestamp(); + msg.reencode(); + } + if (tx == null || autoCommitSends) { routingContext.setTransaction(null); } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 8f26b4e1dd..79cece3e61 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -133,6 +133,8 @@ public class AddressSettings implements Mergeable, Serializable public static final SlowConsumerThresholdMeasurementUnit DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_SECOND; + public static final boolean DEFAULT_ENABLE_INGRESS_TIMESTAMP = false; + private AddressFullMessagePolicy addressFullMessagePolicy = null; private Long maxSizeBytes = null; @@ -265,6 +267,8 @@ public class AddressSettings implements Mergeable, Serializable private Integer managementMessageAttributeSizeLimit = null; + private Boolean enableIngressTimestamp = null; + //from amq5 //make it transient private transient Integer queuePrefetch = null; @@ -332,6 +336,7 @@ public class AddressSettings implements Mergeable, Serializable this.enableMetrics = other.enableMetrics; this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit; this.slowConsumerThresholdMeasurementUnit = other.slowConsumerThresholdMeasurementUnit; + this.enableIngressTimestamp = other.enableIngressTimestamp; } public AddressSettings() { @@ -955,6 +960,15 @@ public class AddressSettings implements Mergeable, Serializable return this; } + public boolean isEnableIngressTimestamp() { + return enableIngressTimestamp != null ? enableIngressTimestamp : AddressSettings.DEFAULT_ENABLE_INGRESS_TIMESTAMP; + } + + public AddressSettings setEnableIngressTimestamp(final boolean enableIngressTimestamp) { + this.enableIngressTimestamp = enableIngressTimestamp; + return this; + } + /** * merge 2 objects in to 1 * @@ -1154,6 +1168,9 @@ public class AddressSettings implements Mergeable, Serializable if (enableMetrics == null) { enableMetrics = merged.enableMetrics; } + if (enableIngressTimestamp == null) { + enableIngressTimestamp = merged.enableIngressTimestamp; + } } @Override @@ -1377,6 +1394,10 @@ public class AddressSettings implements Mergeable, Serializable slowConsumerThresholdMeasurementUnit = SlowConsumerThresholdMeasurementUnit.valueOf(slowConsumerMeasurementUnitEnumValue); } } + + if (buffer.readableBytes() > 0) { + enableIngressTimestamp = BufferHelper.readNullableBoolean(buffer); + } } @Override @@ -1442,7 +1463,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.sizeOfNullableBoolean(enableMetrics) + BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) + BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit) + - BufferHelper.sizeOfNullableInteger(slowConsumerThresholdMeasurementUnit.getValue()); + BufferHelper.sizeOfNullableInteger(slowConsumerThresholdMeasurementUnit.getValue()) + + BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp); } @Override @@ -1572,6 +1594,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableInteger(buffer, managementMessageAttributeSizeLimit); BufferHelper.writeNullableInteger(buffer, slowConsumerThresholdMeasurementUnit == null ? null : slowConsumerThresholdMeasurementUnit.getValue()); + + BufferHelper.writeNullableBoolean(buffer, enableIngressTimestamp); } /* (non-Javadoc) @@ -1646,6 +1670,7 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode()); result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode()); result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode()); + result = prime * result + ((enableIngressTimestamp == null) ? 0 : enableIngressTimestamp.hashCode()); return result; } @@ -2006,6 +2031,12 @@ public class AddressSettings implements Mergeable, Serializable if (slowConsumerThresholdMeasurementUnit != other.slowConsumerThresholdMeasurementUnit) return false; + if (enableIngressTimestamp == null) { + if (other.enableIngressTimestamp != null) + return false; + } else if (!enableIngressTimestamp.equals(other.enableIngressTimestamp)) + return false; + return true; } @@ -2141,6 +2172,8 @@ public class AddressSettings implements Mergeable, Serializable expiryQueueSuffix + ", enableMetrics=" + enableMetrics + + ", enableIngressTime=" + + enableIngressTimestamp + "]"; } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index ad612d2f24..614e056a82 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -3970,6 +3970,14 @@ + + + + whether or not the broker should set its own timestamp on incoming messages to the matching address + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 7878098e72..07f11ab558 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -384,6 +384,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(3, conf.getAddressesSettings().get("a1").getDefaultRingSize()); assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount()); assertTrue(conf.getAddressesSettings().get("a1").isEnableMetrics()); + assertTrue(conf.getAddressesSettings().get("a1").isEnableIngressTimestamp()); assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString()); assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources()); @@ -420,6 +421,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(-1, conf.getAddressesSettings().get("a2").getDefaultRingSize()); assertEquals(10, conf.getAddressesSettings().get("a2").getRetroactiveMessageCount()); assertFalse(conf.getAddressesSettings().get("a2").isEnableMetrics()); + assertFalse(conf.getAddressesSettings().get("a2").isEnableIngressTimestamp()); assertTrue(conf.getResourceLimitSettings().containsKey("myUser")); assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java index a31e09f254..0f20b82e1a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java @@ -101,6 +101,7 @@ public class MessagePropertyTest extends ActiveMQTestBase { assertEquals(floatValue(i), message.getFloatProperty("float").floatValue(), 0.001); assertEquals(new SimpleString(Integer.toString(i)), message.getSimpleStringProperty(SIMPLE_STRING_KEY.toString())); assertEqualsByteArrays(byteArray(i), message.getBytesProperty("byte[]")); + assertNull(message.getIngressTimestamp()); assertTrue(message.containsProperty("null-value")); assertEquals(message.getObjectProperty("null-value"), null); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index a72459c5bd..e0a6a5ab99 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -473,6 +473,7 @@ ANYCAST MULTICAST 3 + true a2.1 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml index 846c0e5a5b..445c1842f6 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml @@ -44,6 +44,7 @@ ANYCAST MULTICAST 3 + true a2.1 diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md index 599b871e5a..e3635afd14 100644 --- a/docs/user-manual/en/address-model.md +++ b/docs/user-manual/en/address-model.md @@ -691,6 +691,7 @@ that would be found in the `broker.xml` file. -1 0 true + false ``` @@ -991,3 +992,12 @@ queues created on the matching address. Defaults to 0. Read more about `enable-metrics` determines whether or not metrics will be published to any configured metrics plugin for the matching address. Default is `true`. Read more about [metrics](metrics.md). + +`enable-ingress-timestamp` determines whether or not the broker will add its time +to messages sent to the matching address. When `true` the exact behavior will +depend on the specific protocol in use. For AMQP messages the broker will add a +`long` *message annotation* named `x-opt-ingress-time`. For core messages (used by +the core and OpenWire protocols) the broker will add a long property named +`_AMQ_INGRESS_TIMESTAMP`. For STOMP messages the broker will add a frame header +named `ingress-timestamp`. The value will be the number of milliseconds since the +[epoch](https://en.wikipedia.org/wiki/Unix_time). Default is `false`. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 7d65cdb627..7e154426b5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; import javax.management.MBeanServer; import javax.management.MBeanServerFactory; import java.net.URI; @@ -25,10 +31,17 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -385,6 +398,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport { } protected void sendMessages(String destinationName, int count, boolean durable) throws Exception { + sendMessages(destinationName, count, durable, null); + } + + protected void sendMessages(String destinationName, int count, boolean durable, byte[] payload) throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); try { @@ -395,6 +412,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport { AmqpMessage message = new AmqpMessage(); message.setMessageId("MessageID:" + i); message.setDurable(durable); + if (payload != null) { + message.setBytes(payload); + } sender.send(message); } } finally { @@ -402,6 +422,57 @@ public class AmqpClientTestSupport extends AmqpTestSupport { } } + protected void sendMessagesCore(String destinationName, int count, boolean durable) throws Exception { + sendMessagesCore(destinationName, count, durable, null); + } + + protected void sendMessagesCore(String destinationName, int count, boolean durable, byte[] body) throws Exception { + ServerLocator serverLocator = ActiveMQClient.createServerLocator("tcp://127.0.0.1:5672"); + ClientSessionFactory clientSessionFactory = serverLocator.createSessionFactory(); + ClientSession session = clientSessionFactory.createSession(); + try { + ClientProducer sender = session.createProducer(destinationName); + + for (int i = 0; i < count; ++i) { + ClientMessage message = session.createMessage(durable); + if (body != null) { + message.getBodyBuffer().writeBytes(body); + } + sender.send(message); + } + } finally { + session.close(); + } + } + + protected void sendMessagesOpenWire(String destinationName, int count, boolean durable) throws Exception { + sendMessagesOpenWire(destinationName, count, durable, null); + } + + protected void sendMessagesOpenWire(String destinationName, int count, boolean durable, byte[] payload) throws Exception { + ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:5672"); + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try { + MessageProducer producer = session.createProducer(session.createQueue(destinationName)); + if (durable) { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } else { + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + } + + for (int i = 0; i < count; ++i) { + BytesMessage message = session.createBytesMessage(); + if (payload != null) { + message.writeBytes(payload); + } + producer.send(message); + } + } finally { + connection.close(); + } + } + protected Source createDynamicSource(boolean topic) { Source source = new Source(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpIngressTimestampTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpIngressTimestampTest.java new file mode 100644 index 0000000000..798aa32c45 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpIngressTimestampTest.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class AmqpIngressTimestampTest extends AmqpClientTestSupport { + + public int amqpMinLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; + + @Parameterized.Parameters(name = "restart={0}, large={1}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true, true}, + {false, false}, + {true, false}, + {false, true} + }); + } + + @Parameterized.Parameter(0) + public boolean restart; + + @Parameterized.Parameter(1) + public boolean large; + + @Test(timeout = 60000) + public void testIngressTimestampSendCore() throws Exception { + internalTestIngressTimestamp(Protocol.CORE); + } + + @Test(timeout = 60000) + public void testIngressTimestampSendAMQP() throws Exception { + internalTestIngressTimestamp(Protocol.AMQP); + } + + @Test(timeout = 60000) + public void testIngressTimestampSendOpenWire() throws Exception { + internalTestIngressTimestamp(Protocol.OPENWIRE); + } + + private void internalTestIngressTimestamp(Protocol protocol) throws Exception { + final String QUEUE_NAME = RandomUtil.randomString(); + server.createQueue(new QueueConfiguration(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST)); + server.getAddressSettingsRepository().addMatch(QUEUE_NAME, new AddressSettings().setEnableIngressTimestamp(true)); + long beforeSend = System.currentTimeMillis(); + if (protocol == Protocol.CORE) { + sendMessagesCore(QUEUE_NAME, 1, true, getMessagePayload()); + } else if (protocol == Protocol.OPENWIRE) { + sendMessagesOpenWire(QUEUE_NAME, 1, true, getMessagePayload()); + } else { + sendMessages(QUEUE_NAME, 1, true, getMessagePayload()); + } + long afterSend = System.currentTimeMillis(); + + if (restart) { + server.stop(); + server.start(); + assertTrue(server.waitForActivation(3, TimeUnit.SECONDS)); + } + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(QUEUE_NAME); + + Queue queueView = getProxyToQueue(QUEUE_NAME); + Wait.assertEquals(1L, queueView::getMessageCount, 2000, 100, false); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + instanceLog.info(receive); + Object ingressTimestampHeader = receive.getMessageAnnotation(AMQPMessageSupport.X_OPT_INGRESS_TIME); + assertNotNull(ingressTimestampHeader); + assertTrue(ingressTimestampHeader instanceof Long); + long ingressTimestamp = (Long) ingressTimestampHeader; + assertTrue("Ingress timstamp " + ingressTimestamp + " should be >= " + beforeSend + " and <= " + afterSend,ingressTimestamp >= beforeSend && ingressTimestamp <= afterSend); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + private enum Protocol { + CORE, AMQP, OPENWIRE + } + + @Override + protected void setData(AmqpMessage amqpMessage) throws Exception { + amqpMessage.setBytes(getMessagePayload()); + } + + @Override + protected void configureAMQPAcceptorParameters(Map params) { + params.put("amqpMinLargeMessageSize", ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + } + + private byte[] getMessagePayload() { + StringBuilder result = new StringBuilder(); + if (large) { + for (int i = 0; i < ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 20; i++) { + result.append("AB"); + } + } else { + result.append("AB"); + } + + return result.toString().getBytes(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/IngressTimestampTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/IngressTimestampTest.java new file mode 100644 index 0000000000..2f9d1adf72 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/IngressTimestampTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.message.JmsTextMessage; +import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade; +import org.apache.qpid.proton.amqp.Symbol; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class IngressTimestampTest extends ActiveMQTestBase { + private ActiveMQServer server; + + private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue"); + + @Parameterized.Parameters(name = "restart={0}, large={1}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true, true}, + {false, false}, + {true, false}, + {false, true} + }); + } + + @Parameterized.Parameter(0) + public boolean restart; + + @Parameterized.Parameter(1) + public boolean large; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + server = createServer(true, true); + server.start(); + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true)); + server.createQueue(new QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST)); + } + + @Test + public void testSendCoreReceiveAMQP() throws Throwable { + internalSendReceive(Protocol.CORE, Protocol.AMQP); + } + + @Test + public void testSendAMQPReceiveAMQP() throws Throwable { + internalSendReceive(Protocol.AMQP, Protocol.AMQP); + } + + @Test + public void testSendOpenWireReceiveAMQP() throws Throwable { + internalSendReceive(Protocol.OPENWIRE, Protocol.AMQP); + } + + @Test + public void testSendCoreReceiveCore() throws Throwable { + internalSendReceive(Protocol.CORE, Protocol.CORE); + } + + @Test + public void testSendAMQPReceiveCore() throws Throwable { + internalSendReceive(Protocol.AMQP, Protocol.CORE); + } + + @Test + public void testSendOpenWireReceiveCore() throws Throwable { + internalSendReceive(Protocol.OPENWIRE, Protocol.CORE); + } + + @Test + public void testSendCoreReceiveOpenwire() throws Throwable { + internalSendReceive(Protocol.CORE, Protocol.OPENWIRE); + } + + @Test + public void testSendAMQPReceiveOpenWire() throws Throwable { + internalSendReceive(Protocol.AMQP, Protocol.OPENWIRE); + } + + @Test + public void testSendOpenWireReceiveOpenWire() throws Throwable { + internalSendReceive(Protocol.OPENWIRE, Protocol.OPENWIRE); + } + + private void internalSendReceive(Protocol protocolSender, Protocol protocolConsumer) throws Throwable { + ConnectionFactory factorySend = createFactory(protocolSender); + ConnectionFactory factoryConsume = protocolConsumer == protocolSender ? factorySend : createFactory(protocolConsumer); + + long beforeSend, afterSend; + try (Connection connection = factorySend.createConnection()) { + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + javax.jms.Queue queue = session.createQueue(QUEUE.toString()); + try (MessageProducer producer = session.createProducer(queue)) { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + TextMessage msg = session.createTextMessage(getMessagePayload()); + beforeSend = System.currentTimeMillis(); + producer.send(msg); + afterSend = System.currentTimeMillis(); + } + } + } + + if (restart) { + server.stop(); + server.start(); + assertTrue(server.waitForActivation(3, TimeUnit.SECONDS)); + } + + try (Connection connection = factoryConsume.createConnection()) { + connection.start(); + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + javax.jms.Queue queue = session.createQueue(QUEUE.toString()); + try (MessageConsumer consumer = session.createConsumer(queue)) { + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Enumeration e = message.getPropertyNames(); + while (e.hasMoreElements()) { + System.out.println(e.nextElement()); + } + Object ingressTimestampHeader = null; + if (protocolConsumer == Protocol.AMQP) { + // Qpid JMS doesn't expose message annotations so we must use reflection here + Method getMessageAnnotation = AmqpJmsMessageFacade.class.getDeclaredMethod("getMessageAnnotation", Symbol.class); + getMessageAnnotation.setAccessible(true); + ingressTimestampHeader = getMessageAnnotation.invoke(((JmsTextMessage)message).getFacade(), Symbol.getSymbol(AMQPMessageSupport.X_OPT_INGRESS_TIME)); + } else { + ingressTimestampHeader = message.getObjectProperty(Message.HDR_INGRESS_TIMESTAMP.toString()); + } + assertNotNull(ingressTimestampHeader); + assertTrue(ingressTimestampHeader instanceof Long); + long ingressTimestamp = (Long) ingressTimestampHeader; + assertTrue("Ingress timstamp " + ingressTimestamp + " should be >= " + beforeSend + " and <= " + afterSend,ingressTimestamp >= beforeSend && ingressTimestamp <= afterSend); + } + } + } + } + + private String getMessagePayload() { + StringBuilder result = new StringBuilder(); + if (large) { + for (int i = 0; i < ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 20; i++) { + result.append("AB"); + } + } else { + result.append("AB"); + } + + return result.toString(); + } + + private ConnectionFactory createFactory(Protocol protocol) { + switch (protocol) { + case CORE: return new ActiveMQConnectionFactory(); // core protocol + case AMQP: return new JmsConnectionFactory("amqp://localhost:61616"); // amqp + case OPENWIRE: return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); // openwire + default: return null; + } + } + + private enum Protocol { + CORE, AMQP, OPENWIRE + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 24fc0877ba..69428e0bc6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -637,6 +637,27 @@ public class StompTest extends StompTestBase { } + @Test + public void testIngressTimestamp() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true)); + conn.connect(defUser, defPass); + + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO); + long beforeSend = System.currentTimeMillis(); + sendJmsMessage(getName()); + long afterSend = System.currentTimeMillis(); + + ClientStompFrame frame = conn.receiveFrame(10000); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION)); + String ingressTimestampHeader = frame.getHeader(Stomp.Headers.Message.INGRESS_TIMESTAMP); + Assert.assertNotNull(ingressTimestampHeader); + long ingressTimestamp = Long.parseLong(ingressTimestampHeader); + assertTrue("Ingress timstamp " + ingressTimestamp + " should be >= " + beforeSend + " and <= " + afterSend,ingressTimestamp >= beforeSend && ingressTimestamp <= afterSend); + + conn.disconnect(); + } + @Test public void testAnycastDestinationTypeMessageProperty() throws Exception { conn.connect(defUser, defPass);