This closes #3279
This commit is contained in:
commit
b5f772e9e5
|
@ -161,6 +161,11 @@ public interface Message {
|
||||||
*/
|
*/
|
||||||
SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
|
SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The time at which the message arrived at the broker.
|
||||||
|
*/
|
||||||
|
SimpleString HDR_INGRESS_TIMESTAMP = new SimpleString("_AMQ_INGRESS_TIMESTAMP");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The prefix used (if any) when sending this message. For protocols (e.g. STOMP) that need to track this and restore
|
* The prefix used (if any) when sending this message. For protocols (e.g. STOMP) that need to track this and restore
|
||||||
* the prefix when the message is consumed.
|
* the prefix when the message is consumed.
|
||||||
|
@ -643,6 +648,15 @@ public interface Message {
|
||||||
return getObjectProperty(key);
|
return getObjectProperty(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default Message setIngressTimestamp() {
|
||||||
|
setBrokerProperty(HDR_INGRESS_TIMESTAMP, System.currentTimeMillis());
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
default Long getIngressTimestamp() {
|
||||||
|
return (Long) getBrokerProperty(HDR_INGRESS_TIMESTAMP);
|
||||||
|
}
|
||||||
|
|
||||||
Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
|
Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
|
||||||
|
|
||||||
Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException;
|
Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException;
|
||||||
|
|
|
@ -69,6 +69,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
|
||||||
reader.readInto(wrapbuffer);
|
reader.readInto(wrapbuffer);
|
||||||
|
|
||||||
AMQPStandardMessage standardMessage = new AMQPStandardMessage(messageFormat, buffer, extraProperties, coreMessageObjectPools);
|
AMQPStandardMessage standardMessage = new AMQPStandardMessage(messageFormat, buffer, extraProperties, coreMessageObjectPools);
|
||||||
|
standardMessage.setMessageAnnotations(messageAnnotations);
|
||||||
standardMessage.setMessageID(messageID);
|
standardMessage.setMessageID(messageID);
|
||||||
return standardMessage.toCore();
|
return standardMessage.toCore();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -262,6 +262,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
return applicationProperties;
|
return applicationProperties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected MessageAnnotations getDecodedMessageAnnotations() {
|
||||||
|
return messageAnnotations;
|
||||||
|
}
|
||||||
|
|
||||||
protected abstract ReadableBuffer getData();
|
protected abstract ReadableBuffer getData();
|
||||||
|
|
||||||
// Access to the AMQP message data using safe copies freshly decoded from the current
|
// Access to the AMQP message data using safe copies freshly decoded from the current
|
||||||
|
@ -586,6 +590,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
getMessageAnnotationsMap(true).put(annotation, value);
|
getMessageAnnotationsMap(true).put(annotation, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void setMessageAnnotations(MessageAnnotations messageAnnotations) {
|
||||||
|
this.messageAnnotations = messageAnnotations;
|
||||||
|
}
|
||||||
|
|
||||||
// Message decoding and copying methods. Care must be taken here to ensure the buffer and the
|
// Message decoding and copying methods. Care must be taken here to ensure the buffer and the
|
||||||
// state tracking information is kept up to data. When the message is manually changed a forced
|
// state tracking information is kept up to data. When the message is manually changed a forced
|
||||||
// re-encode should be done to update the backing data with the in memory elements.
|
// re-encode should be done to update the backing data with the in memory elements.
|
||||||
|
@ -1351,6 +1359,17 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
||||||
return extra.getProperty(key);
|
return extra.getProperty(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final org.apache.activemq.artemis.api.core.Message setIngressTimestamp() {
|
||||||
|
setMessageAnnotation(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION, System.currentTimeMillis());
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getIngressTimestamp() {
|
||||||
|
return (Long) getMessageAnnotation(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// JMS Style property access methods. These can result in additional decode of AMQP message
|
// JMS Style property access methods. These can result in additional decode of AMQP message
|
||||||
// data from Application properties. Updates to application properties puts the message in a
|
// data from Application properties. Updates to application properties puts the message in a
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
|
||||||
import org.apache.qpid.proton.amqp.Symbol;
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Header;
|
import org.apache.qpid.proton.amqp.messaging.Header;
|
||||||
|
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Properties;
|
import org.apache.qpid.proton.amqp.messaging.Properties;
|
||||||
|
|
||||||
/** <b>Warning:</b> do not use this class outside of the broker implementation.
|
/** <b>Warning:</b> do not use this class outside of the broker implementation.
|
||||||
|
@ -56,4 +57,8 @@ public class AMQPMessageBrokerAccessor {
|
||||||
return message.getCurrentProperties();
|
return message.getCurrentProperties();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static MessageAnnotations getDecodedMessageAnnotations(AMQPMessage message) {
|
||||||
|
return message.getDecodedMessageAnnotations();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -183,6 +183,8 @@ public final class AMQPMessageSupport {
|
||||||
public static final String X_OPT_PREFIX = "x-opt-";
|
public static final String X_OPT_PREFIX = "x-opt-";
|
||||||
public static final String AMQ_PROPERTY_PREFIX = "_AMQ_";
|
public static final String AMQ_PROPERTY_PREFIX = "_AMQ_";
|
||||||
|
|
||||||
|
public static final String X_OPT_INGRESS_TIME = X_OPT_PREFIX + "ingress-time";
|
||||||
|
|
||||||
public static final short AMQP_UNKNOWN = 0;
|
public static final short AMQP_UNKNOWN = 0;
|
||||||
public static final short AMQP_NULL = 1;
|
public static final short AMQP_NULL = 1;
|
||||||
public static final short AMQP_DATA = 2;
|
public static final short AMQP_DATA = 2;
|
||||||
|
@ -195,6 +197,7 @@ public final class AMQPMessageSupport {
|
||||||
|
|
||||||
public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-dest");
|
public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-dest");
|
||||||
public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-reply-to");
|
public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-reply-to");
|
||||||
|
public static final Symbol INGRESS_TIME_MSG_ANNOTATION = getSymbol(X_OPT_INGRESS_TIME);
|
||||||
|
|
||||||
public static final byte QUEUE_TYPE = 0x00;
|
public static final byte QUEUE_TYPE = 0x00;
|
||||||
public static final byte TOPIC_TYPE = 0x01;
|
public static final byte TOPIC_TYPE = 0x01;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.activemq.artemis.protocol.amqp.converter;
|
package org.apache.activemq.artemis.protocol.amqp.converter;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP;
|
||||||
import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
|
import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA;
|
||||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
|
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
|
||||||
|
@ -292,6 +293,8 @@ public class AmqpCoreConverter {
|
||||||
if (delay > 0) {
|
if (delay > 0) {
|
||||||
jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay);
|
jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay);
|
||||||
}
|
}
|
||||||
|
} else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) {
|
||||||
|
jms.setLongProperty(HDR_INGRESS_TIMESTAMP.toString(), ((Number) entry.getValue()).longValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -295,6 +295,9 @@ public class CoreAmqpConverter {
|
||||||
} else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
|
} else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
|
||||||
// skip..remove annotation from previous inbound transformation
|
// skip..remove annotation from previous inbound transformation
|
||||||
continue;
|
continue;
|
||||||
|
} else if (key.equals(Message.HDR_INGRESS_TIMESTAMP.toString())) {
|
||||||
|
maMap.put(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION, message.getLongProperty(key));
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (apMap == null) {
|
if (apMap == null) {
|
||||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.qpid.proton.amqp.messaging.Accepted;
|
||||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||||
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
|
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Header;
|
import org.apache.qpid.proton.amqp.messaging.Header;
|
||||||
|
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Modified;
|
import org.apache.qpid.proton.amqp.messaging.Modified;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Outcome;
|
import org.apache.qpid.proton.amqp.messaging.Outcome;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Properties;
|
import org.apache.qpid.proton.amqp.messaging.Properties;
|
||||||
|
@ -701,7 +702,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
try {
|
try {
|
||||||
int proposedPosition = writeHeaderAndAnnotations(context, deliveryAnnotationsToEncode);
|
int proposedPosition = writeHeaderAndAnnotations(context, deliveryAnnotationsToEncode);
|
||||||
if (message.isReencoded()) {
|
if (message.isReencoded()) {
|
||||||
proposedPosition = writePropertiesAndApplicationProperties(context, message);
|
proposedPosition = writeMessageAnnotationsPropertiesAndApplicationProperties(context, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
context.position(proposedPosition);
|
context.position(proposedPosition);
|
||||||
|
@ -716,14 +717,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
/**
|
/**
|
||||||
* Write properties and application properties when the message is flagged as re-encoded.
|
* Write properties and application properties when the message is flagged as re-encoded.
|
||||||
*/
|
*/
|
||||||
private int writePropertiesAndApplicationProperties(LargeBodyReader context, AMQPLargeMessage message) throws Exception {
|
private int writeMessageAnnotationsPropertiesAndApplicationProperties(LargeBodyReader context, AMQPLargeMessage message) throws Exception {
|
||||||
int bodyPosition = AMQPMessageBrokerAccessor.getRemainingBodyPosition(message);
|
int bodyPosition = AMQPMessageBrokerAccessor.getRemainingBodyPosition(message);
|
||||||
assert bodyPosition > 0;
|
assert bodyPosition > 0;
|
||||||
writePropertiesAndApplicationPropertiesInternal(message);
|
writeMessageAnnotationsPropertiesAndApplicationPropertiesInternal(message);
|
||||||
return bodyPosition;
|
return bodyPosition;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writePropertiesAndApplicationPropertiesInternal(AMQPLargeMessage message) {
|
private void writeMessageAnnotationsPropertiesAndApplicationPropertiesInternal(AMQPLargeMessage message) {
|
||||||
|
MessageAnnotations messageAnnotations = AMQPMessageBrokerAccessor.getDecodedMessageAnnotations(message);
|
||||||
|
|
||||||
|
if (messageAnnotations != null) {
|
||||||
|
TLSEncode.getEncoder().writeObject(messageAnnotations);
|
||||||
|
}
|
||||||
|
|
||||||
Properties amqpProperties = AMQPMessageBrokerAccessor.getCurrentProperties(message);
|
Properties amqpProperties = AMQPMessageBrokerAccessor.getCurrentProperties(message);
|
||||||
if (amqpProperties != null) {
|
if (amqpProperties != null) {
|
||||||
TLSEncode.getEncoder().writeObject(amqpProperties);
|
TLSEncode.getEncoder().writeObject(amqpProperties);
|
||||||
|
|
|
@ -686,6 +686,11 @@ public final class OpenWireMessageConverter {
|
||||||
setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
|
setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final Long ingressTimestamp = coreMessage.getPropertyNames().contains(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) ? coreMessage.getLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) : null;
|
||||||
|
if (ingressTimestamp != null) {
|
||||||
|
setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp);
|
||||||
|
}
|
||||||
|
|
||||||
final Set<SimpleString> props = coreMessage.getPropertyNames();
|
final Set<SimpleString> props = coreMessage.getPropertyNames();
|
||||||
if (props != null) {
|
if (props != null) {
|
||||||
setAMQMsgObjectProperties(amqMsg, coreMessage, props);
|
setAMQMsgObjectProperties(amqMsg, coreMessage, props);
|
||||||
|
@ -937,6 +942,15 @@ public final class OpenWireMessageConverter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void setAMQMsgHdrIngressTimestamp(final ActiveMQMessage amqMsg,
|
||||||
|
final Long ingressTimestamp) throws IOException {
|
||||||
|
try {
|
||||||
|
amqMsg.setLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP.toString(), ingressTimestamp);
|
||||||
|
} catch (JMSException e) {
|
||||||
|
throw new IOException("failure to set ingress timestamp property " + ingressTimestamp, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg,
|
private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg,
|
||||||
final ICoreMessage coreMessage,
|
final ICoreMessage coreMessage,
|
||||||
final Set<SimpleString> props) throws IOException {
|
final Set<SimpleString> props) throws IOException {
|
||||||
|
|
|
@ -135,6 +135,8 @@ public interface Stomp {
|
||||||
String PERSISTENT = "persistent";
|
String PERSISTENT = "persistent";
|
||||||
|
|
||||||
String VALIDATED_USER = "JMSXUserID";
|
String VALIDATED_USER = "JMSXUserID";
|
||||||
|
|
||||||
|
String INGRESS_TIMESTAMP = "ingress-timestamp";
|
||||||
}
|
}
|
||||||
|
|
||||||
interface Subscribe {
|
interface Subscribe {
|
||||||
|
|
|
@ -132,6 +132,9 @@ public class StompUtils {
|
||||||
if (message.containsProperty(Message.HDR_ROUTING_TYPE)) {
|
if (message.containsProperty(Message.HDR_ROUTING_TYPE)) {
|
||||||
command.addHeader(Stomp.Headers.Send.DESTINATION_TYPE, RoutingType.getType(message.getByteProperty(Message.HDR_ROUTING_TYPE.toString())).toString());
|
command.addHeader(Stomp.Headers.Send.DESTINATION_TYPE, RoutingType.getType(message.getByteProperty(Message.HDR_ROUTING_TYPE.toString())).toString());
|
||||||
}
|
}
|
||||||
|
if (message.containsProperty(Message.HDR_INGRESS_TIMESTAMP)) {
|
||||||
|
command.addHeader(Stomp.Headers.Message.INGRESS_TIMESTAMP, Long.toString(message.getLongProperty(Message.HDR_INGRESS_TIMESTAMP)));
|
||||||
|
}
|
||||||
|
|
||||||
// now let's add all the rest of the message headers
|
// now let's add all the rest of the message headers
|
||||||
Set<SimpleString> names = message.getPropertyNames();
|
Set<SimpleString> names = message.getPropertyNames();
|
||||||
|
|
|
@ -311,6 +311,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
|
|
||||||
private static final String ENABLE_METRICS = "enable-metrics";
|
private static final String ENABLE_METRICS = "enable-metrics";
|
||||||
|
|
||||||
|
private static final String ENABLE_INGRESS_TIMESTAMP = "enable-ingress-timestamp";
|
||||||
|
|
||||||
// Attributes ----------------------------------------------------
|
// Attributes ----------------------------------------------------
|
||||||
|
|
||||||
|
@ -1361,6 +1362,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
addressSettings.setExpiryQueueSuffix(new SimpleString(getTrimmedTextContent(child)));
|
addressSettings.setExpiryQueueSuffix(new SimpleString(getTrimmedTextContent(child)));
|
||||||
} else if (ENABLE_METRICS.equalsIgnoreCase(name)) {
|
} else if (ENABLE_METRICS.equalsIgnoreCase(name)) {
|
||||||
addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child));
|
addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child));
|
||||||
|
} else if (ENABLE_INGRESS_TIMESTAMP.equalsIgnoreCase(name)) {
|
||||||
|
addressSettings.setEnableIngressTimestamp(XMLUtil.parseBoolean(child));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return setting;
|
return setting;
|
||||||
|
|
|
@ -2181,6 +2181,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.rejectEmptyValidatedUser();
|
throw ActiveMQMessageBundle.BUNDLE.rejectEmptyValidatedUser();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (server.getAddressSettingsRepository().getMatch(msg.getAddress()).isEnableIngressTimestamp()) {
|
||||||
|
msg.setIngressTimestamp();
|
||||||
|
msg.reencode();
|
||||||
|
}
|
||||||
|
|
||||||
if (tx == null || autoCommitSends) {
|
if (tx == null || autoCommitSends) {
|
||||||
routingContext.setTransaction(null);
|
routingContext.setTransaction(null);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -133,6 +133,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
|
|
||||||
public static final SlowConsumerThresholdMeasurementUnit DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_SECOND;
|
public static final SlowConsumerThresholdMeasurementUnit DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_SECOND;
|
||||||
|
|
||||||
|
public static final boolean DEFAULT_ENABLE_INGRESS_TIMESTAMP = false;
|
||||||
|
|
||||||
private AddressFullMessagePolicy addressFullMessagePolicy = null;
|
private AddressFullMessagePolicy addressFullMessagePolicy = null;
|
||||||
|
|
||||||
private Long maxSizeBytes = null;
|
private Long maxSizeBytes = null;
|
||||||
|
@ -265,6 +267,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
|
|
||||||
private Integer managementMessageAttributeSizeLimit = null;
|
private Integer managementMessageAttributeSizeLimit = null;
|
||||||
|
|
||||||
|
private Boolean enableIngressTimestamp = null;
|
||||||
|
|
||||||
//from amq5
|
//from amq5
|
||||||
//make it transient
|
//make it transient
|
||||||
private transient Integer queuePrefetch = null;
|
private transient Integer queuePrefetch = null;
|
||||||
|
@ -332,6 +336,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
this.enableMetrics = other.enableMetrics;
|
this.enableMetrics = other.enableMetrics;
|
||||||
this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit;
|
this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit;
|
||||||
this.slowConsumerThresholdMeasurementUnit = other.slowConsumerThresholdMeasurementUnit;
|
this.slowConsumerThresholdMeasurementUnit = other.slowConsumerThresholdMeasurementUnit;
|
||||||
|
this.enableIngressTimestamp = other.enableIngressTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AddressSettings() {
|
public AddressSettings() {
|
||||||
|
@ -955,6 +960,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isEnableIngressTimestamp() {
|
||||||
|
return enableIngressTimestamp != null ? enableIngressTimestamp : AddressSettings.DEFAULT_ENABLE_INGRESS_TIMESTAMP;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AddressSettings setEnableIngressTimestamp(final boolean enableIngressTimestamp) {
|
||||||
|
this.enableIngressTimestamp = enableIngressTimestamp;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* merge 2 objects in to 1
|
* merge 2 objects in to 1
|
||||||
*
|
*
|
||||||
|
@ -1154,6 +1168,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
if (enableMetrics == null) {
|
if (enableMetrics == null) {
|
||||||
enableMetrics = merged.enableMetrics;
|
enableMetrics = merged.enableMetrics;
|
||||||
}
|
}
|
||||||
|
if (enableIngressTimestamp == null) {
|
||||||
|
enableIngressTimestamp = merged.enableIngressTimestamp;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1377,6 +1394,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
slowConsumerThresholdMeasurementUnit = SlowConsumerThresholdMeasurementUnit.valueOf(slowConsumerMeasurementUnitEnumValue);
|
slowConsumerThresholdMeasurementUnit = SlowConsumerThresholdMeasurementUnit.valueOf(slowConsumerMeasurementUnitEnumValue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (buffer.readableBytes() > 0) {
|
||||||
|
enableIngressTimestamp = BufferHelper.readNullableBoolean(buffer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1442,7 +1463,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
BufferHelper.sizeOfNullableBoolean(enableMetrics) +
|
BufferHelper.sizeOfNullableBoolean(enableMetrics) +
|
||||||
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) +
|
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) +
|
||||||
BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit) +
|
BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit) +
|
||||||
BufferHelper.sizeOfNullableInteger(slowConsumerThresholdMeasurementUnit.getValue());
|
BufferHelper.sizeOfNullableInteger(slowConsumerThresholdMeasurementUnit.getValue()) +
|
||||||
|
BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1572,6 +1594,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
BufferHelper.writeNullableInteger(buffer, managementMessageAttributeSizeLimit);
|
BufferHelper.writeNullableInteger(buffer, managementMessageAttributeSizeLimit);
|
||||||
|
|
||||||
BufferHelper.writeNullableInteger(buffer, slowConsumerThresholdMeasurementUnit == null ? null : slowConsumerThresholdMeasurementUnit.getValue());
|
BufferHelper.writeNullableInteger(buffer, slowConsumerThresholdMeasurementUnit == null ? null : slowConsumerThresholdMeasurementUnit.getValue());
|
||||||
|
|
||||||
|
BufferHelper.writeNullableBoolean(buffer, enableIngressTimestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* (non-Javadoc)
|
/* (non-Javadoc)
|
||||||
|
@ -1646,6 +1670,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode());
|
result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode());
|
||||||
result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode());
|
result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode());
|
||||||
result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode());
|
result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode());
|
||||||
|
result = prime * result + ((enableIngressTimestamp == null) ? 0 : enableIngressTimestamp.hashCode());
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2006,6 +2031,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
if (slowConsumerThresholdMeasurementUnit != other.slowConsumerThresholdMeasurementUnit)
|
if (slowConsumerThresholdMeasurementUnit != other.slowConsumerThresholdMeasurementUnit)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
if (enableIngressTimestamp == null) {
|
||||||
|
if (other.enableIngressTimestamp != null)
|
||||||
|
return false;
|
||||||
|
} else if (!enableIngressTimestamp.equals(other.enableIngressTimestamp))
|
||||||
|
return false;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2141,6 +2172,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
expiryQueueSuffix +
|
expiryQueueSuffix +
|
||||||
", enableMetrics=" +
|
", enableMetrics=" +
|
||||||
enableMetrics +
|
enableMetrics +
|
||||||
|
", enableIngressTime=" +
|
||||||
|
enableIngressTimestamp +
|
||||||
"]";
|
"]";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3970,6 +3970,14 @@
|
||||||
</xsd:annotation>
|
</xsd:annotation>
|
||||||
</xsd:element>
|
</xsd:element>
|
||||||
|
|
||||||
|
<xsd:element name="enable-ingress-timestamp" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
whether or not the broker should set its own timestamp on incoming messages to the matching address
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
|
||||||
</xsd:all>
|
</xsd:all>
|
||||||
|
|
||||||
<xsd:attribute name="match" type="xsd:string" use="required">
|
<xsd:attribute name="match" type="xsd:string" use="required">
|
||||||
|
|
|
@ -384,6 +384,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
||||||
assertEquals(3, conf.getAddressesSettings().get("a1").getDefaultRingSize());
|
assertEquals(3, conf.getAddressesSettings().get("a1").getDefaultRingSize());
|
||||||
assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount());
|
assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount());
|
||||||
assertTrue(conf.getAddressesSettings().get("a1").isEnableMetrics());
|
assertTrue(conf.getAddressesSettings().get("a1").isEnableMetrics());
|
||||||
|
assertTrue(conf.getAddressesSettings().get("a1").isEnableIngressTimestamp());
|
||||||
|
|
||||||
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
|
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
|
||||||
assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources());
|
assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources());
|
||||||
|
@ -420,6 +421,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
||||||
assertEquals(-1, conf.getAddressesSettings().get("a2").getDefaultRingSize());
|
assertEquals(-1, conf.getAddressesSettings().get("a2").getDefaultRingSize());
|
||||||
assertEquals(10, conf.getAddressesSettings().get("a2").getRetroactiveMessageCount());
|
assertEquals(10, conf.getAddressesSettings().get("a2").getRetroactiveMessageCount());
|
||||||
assertFalse(conf.getAddressesSettings().get("a2").isEnableMetrics());
|
assertFalse(conf.getAddressesSettings().get("a2").isEnableMetrics());
|
||||||
|
assertFalse(conf.getAddressesSettings().get("a2").isEnableIngressTimestamp());
|
||||||
|
|
||||||
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
|
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
|
||||||
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
|
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
|
||||||
|
|
|
@ -101,6 +101,7 @@ public class MessagePropertyTest extends ActiveMQTestBase {
|
||||||
assertEquals(floatValue(i), message.getFloatProperty("float").floatValue(), 0.001);
|
assertEquals(floatValue(i), message.getFloatProperty("float").floatValue(), 0.001);
|
||||||
assertEquals(new SimpleString(Integer.toString(i)), message.getSimpleStringProperty(SIMPLE_STRING_KEY.toString()));
|
assertEquals(new SimpleString(Integer.toString(i)), message.getSimpleStringProperty(SIMPLE_STRING_KEY.toString()));
|
||||||
assertEqualsByteArrays(byteArray(i), message.getBytesProperty("byte[]"));
|
assertEqualsByteArrays(byteArray(i), message.getBytesProperty("byte[]"));
|
||||||
|
assertNull(message.getIngressTimestamp());
|
||||||
|
|
||||||
assertTrue(message.containsProperty("null-value"));
|
assertTrue(message.containsProperty("null-value"));
|
||||||
assertEquals(message.getObjectProperty("null-value"), null);
|
assertEquals(message.getObjectProperty("null-value"), null);
|
||||||
|
|
|
@ -473,6 +473,7 @@
|
||||||
<default-queue-routing-type>ANYCAST</default-queue-routing-type>
|
<default-queue-routing-type>ANYCAST</default-queue-routing-type>
|
||||||
<default-address-routing-type>MULTICAST</default-address-routing-type>
|
<default-address-routing-type>MULTICAST</default-address-routing-type>
|
||||||
<default-ring-size>3</default-ring-size>
|
<default-ring-size>3</default-ring-size>
|
||||||
|
<enable-ingress-timestamp>true</enable-ingress-timestamp>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
<address-setting match="a2">
|
<address-setting match="a2">
|
||||||
<dead-letter-address>a2.1</dead-letter-address>
|
<dead-letter-address>a2.1</dead-letter-address>
|
||||||
|
|
|
@ -44,6 +44,7 @@
|
||||||
<default-queue-routing-type>ANYCAST</default-queue-routing-type>
|
<default-queue-routing-type>ANYCAST</default-queue-routing-type>
|
||||||
<default-address-routing-type>MULTICAST</default-address-routing-type>
|
<default-address-routing-type>MULTICAST</default-address-routing-type>
|
||||||
<default-ring-size>3</default-ring-size>
|
<default-ring-size>3</default-ring-size>
|
||||||
|
<enable-ingress-timestamp>true</enable-ingress-timestamp>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
<address-setting match="a2">
|
<address-setting match="a2">
|
||||||
<dead-letter-address>a2.1</dead-letter-address>
|
<dead-letter-address>a2.1</dead-letter-address>
|
||||||
|
|
|
@ -691,6 +691,7 @@ that would be found in the `broker.xml` file.
|
||||||
<default-ring-size>-1</default-ring-size>
|
<default-ring-size>-1</default-ring-size>
|
||||||
<retroactive-message-count>0</retroactive-message-count>
|
<retroactive-message-count>0</retroactive-message-count>
|
||||||
<enable-metrics>true</enable-metrics>
|
<enable-metrics>true</enable-metrics>
|
||||||
|
<enable-ingress-timestamp>false</enable-ingress-timestamp>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
</address-settings>
|
</address-settings>
|
||||||
```
|
```
|
||||||
|
@ -991,3 +992,12 @@ queues created on the matching address. Defaults to 0. Read more about
|
||||||
`enable-metrics` determines whether or not metrics will be published to any
|
`enable-metrics` determines whether or not metrics will be published to any
|
||||||
configured metrics plugin for the matching address. Default is `true`. Read more
|
configured metrics plugin for the matching address. Default is `true`. Read more
|
||||||
about [metrics](metrics.md).
|
about [metrics](metrics.md).
|
||||||
|
|
||||||
|
`enable-ingress-timestamp` determines whether or not the broker will add its time
|
||||||
|
to messages sent to the matching address. When `true` the exact behavior will
|
||||||
|
depend on the specific protocol in use. For AMQP messages the broker will add a
|
||||||
|
`long` *message annotation* named `x-opt-ingress-time`. For core messages (used by
|
||||||
|
the core and OpenWire protocols) the broker will add a long property named
|
||||||
|
`_AMQ_INGRESS_TIMESTAMP`. For STOMP messages the broker will add a frame header
|
||||||
|
named `ingress-timestamp`. The value will be the number of milliseconds since the
|
||||||
|
[epoch](https://en.wikipedia.org/wiki/Unix_time). Default is `false`.
|
||||||
|
|
|
@ -16,6 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import javax.jms.BytesMessage;
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
import javax.management.MBeanServer;
|
import javax.management.MBeanServer;
|
||||||
import javax.management.MBeanServerFactory;
|
import javax.management.MBeanServerFactory;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
@ -25,10 +31,17 @@ import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.security.Role;
|
import org.apache.activemq.artemis.core.security.Role;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
@ -385,6 +398,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void sendMessages(String destinationName, int count, boolean durable) throws Exception {
|
protected void sendMessages(String destinationName, int count, boolean durable) throws Exception {
|
||||||
|
sendMessages(destinationName, count, durable, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendMessages(String destinationName, int count, boolean durable, byte[] payload) throws Exception {
|
||||||
AmqpClient client = createAmqpClient();
|
AmqpClient client = createAmqpClient();
|
||||||
AmqpConnection connection = addConnection(client.connect());
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
try {
|
try {
|
||||||
|
@ -395,6 +412,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
||||||
AmqpMessage message = new AmqpMessage();
|
AmqpMessage message = new AmqpMessage();
|
||||||
message.setMessageId("MessageID:" + i);
|
message.setMessageId("MessageID:" + i);
|
||||||
message.setDurable(durable);
|
message.setDurable(durable);
|
||||||
|
if (payload != null) {
|
||||||
|
message.setBytes(payload);
|
||||||
|
}
|
||||||
sender.send(message);
|
sender.send(message);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -402,6 +422,57 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void sendMessagesCore(String destinationName, int count, boolean durable) throws Exception {
|
||||||
|
sendMessagesCore(destinationName, count, durable, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendMessagesCore(String destinationName, int count, boolean durable, byte[] body) throws Exception {
|
||||||
|
ServerLocator serverLocator = ActiveMQClient.createServerLocator("tcp://127.0.0.1:5672");
|
||||||
|
ClientSessionFactory clientSessionFactory = serverLocator.createSessionFactory();
|
||||||
|
ClientSession session = clientSessionFactory.createSession();
|
||||||
|
try {
|
||||||
|
ClientProducer sender = session.createProducer(destinationName);
|
||||||
|
|
||||||
|
for (int i = 0; i < count; ++i) {
|
||||||
|
ClientMessage message = session.createMessage(durable);
|
||||||
|
if (body != null) {
|
||||||
|
message.getBodyBuffer().writeBytes(body);
|
||||||
|
}
|
||||||
|
sender.send(message);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendMessagesOpenWire(String destinationName, int count, boolean durable) throws Exception {
|
||||||
|
sendMessagesOpenWire(destinationName, count, durable, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendMessagesOpenWire(String destinationName, int count, boolean durable, byte[] payload) throws Exception {
|
||||||
|
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:5672");
|
||||||
|
Connection connection = cf.createConnection();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
try {
|
||||||
|
MessageProducer producer = session.createProducer(session.createQueue(destinationName));
|
||||||
|
if (durable) {
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
} else {
|
||||||
|
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < count; ++i) {
|
||||||
|
BytesMessage message = session.createBytesMessage();
|
||||||
|
if (payload != null) {
|
||||||
|
message.writeBytes(payload);
|
||||||
|
}
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected Source createDynamicSource(boolean topic) {
|
protected Source createDynamicSource(boolean topic) {
|
||||||
|
|
||||||
Source source = new Source();
|
Source source = new Source();
|
||||||
|
|
|
@ -0,0 +1,154 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
|
||||||
|
import org.apache.activemq.artemis.tests.util.RandomUtil;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class AmqpIngressTimestampTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
|
public int amqpMinLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "restart={0}, large={1}")
|
||||||
|
public static Collection<Object[]> parameters() {
|
||||||
|
return Arrays.asList(new Object[][] {
|
||||||
|
{true, true},
|
||||||
|
{false, false},
|
||||||
|
{true, false},
|
||||||
|
{false, true}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameter(0)
|
||||||
|
public boolean restart;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(1)
|
||||||
|
public boolean large;
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testIngressTimestampSendCore() throws Exception {
|
||||||
|
internalTestIngressTimestamp(Protocol.CORE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testIngressTimestampSendAMQP() throws Exception {
|
||||||
|
internalTestIngressTimestamp(Protocol.AMQP);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testIngressTimestampSendOpenWire() throws Exception {
|
||||||
|
internalTestIngressTimestamp(Protocol.OPENWIRE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void internalTestIngressTimestamp(Protocol protocol) throws Exception {
|
||||||
|
final String QUEUE_NAME = RandomUtil.randomString();
|
||||||
|
server.createQueue(new QueueConfiguration(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST));
|
||||||
|
server.getAddressSettingsRepository().addMatch(QUEUE_NAME, new AddressSettings().setEnableIngressTimestamp(true));
|
||||||
|
long beforeSend = System.currentTimeMillis();
|
||||||
|
if (protocol == Protocol.CORE) {
|
||||||
|
sendMessagesCore(QUEUE_NAME, 1, true, getMessagePayload());
|
||||||
|
} else if (protocol == Protocol.OPENWIRE) {
|
||||||
|
sendMessagesOpenWire(QUEUE_NAME, 1, true, getMessagePayload());
|
||||||
|
} else {
|
||||||
|
sendMessages(QUEUE_NAME, 1, true, getMessagePayload());
|
||||||
|
}
|
||||||
|
long afterSend = System.currentTimeMillis();
|
||||||
|
|
||||||
|
if (restart) {
|
||||||
|
server.stop();
|
||||||
|
server.start();
|
||||||
|
assertTrue(server.waitForActivation(3, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(QUEUE_NAME);
|
||||||
|
|
||||||
|
Queue queueView = getProxyToQueue(QUEUE_NAME);
|
||||||
|
Wait.assertEquals(1L, queueView::getMessageCount, 2000, 100, false);
|
||||||
|
|
||||||
|
receiver.flow(1);
|
||||||
|
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(receive);
|
||||||
|
instanceLog.info(receive);
|
||||||
|
Object ingressTimestampHeader = receive.getMessageAnnotation(AMQPMessageSupport.X_OPT_INGRESS_TIME);
|
||||||
|
assertNotNull(ingressTimestampHeader);
|
||||||
|
assertTrue(ingressTimestampHeader instanceof Long);
|
||||||
|
long ingressTimestamp = (Long) ingressTimestampHeader;
|
||||||
|
assertTrue("Ingress timstamp " + ingressTimestamp + " should be >= " + beforeSend + " and <= " + afterSend,ingressTimestamp >= beforeSend && ingressTimestamp <= afterSend);
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getConfiguredProtocols() {
|
||||||
|
return "AMQP,OPENWIRE,CORE";
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum Protocol {
|
||||||
|
CORE, AMQP, OPENWIRE
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setData(AmqpMessage amqpMessage) throws Exception {
|
||||||
|
amqpMessage.setBytes(getMessagePayload());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
|
||||||
|
params.put("amqpMinLargeMessageSize", ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] getMessagePayload() {
|
||||||
|
StringBuilder result = new StringBuilder();
|
||||||
|
if (large) {
|
||||||
|
for (int i = 0; i < ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 20; i++) {
|
||||||
|
result.append("AB");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
result.append("AB");
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.toString().getBytes();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,207 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Enumeration;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||||
|
import org.apache.qpid.jms.message.JmsTextMessage;
|
||||||
|
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
|
||||||
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class IngressTimestampTest extends ActiveMQTestBase {
|
||||||
|
private ActiveMQServer server;
|
||||||
|
|
||||||
|
private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "restart={0}, large={1}")
|
||||||
|
public static Collection<Object[]> parameters() {
|
||||||
|
return Arrays.asList(new Object[][] {
|
||||||
|
{true, true},
|
||||||
|
{false, false},
|
||||||
|
{true, false},
|
||||||
|
{false, true}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameter(0)
|
||||||
|
public boolean restart;
|
||||||
|
|
||||||
|
@Parameterized.Parameter(1)
|
||||||
|
public boolean large;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
server = createServer(true, true);
|
||||||
|
server.start();
|
||||||
|
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true));
|
||||||
|
server.createQueue(new QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendCoreReceiveAMQP() throws Throwable {
|
||||||
|
internalSendReceive(Protocol.CORE, Protocol.AMQP);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendAMQPReceiveAMQP() throws Throwable {
|
||||||
|
internalSendReceive(Protocol.AMQP, Protocol.AMQP);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendOpenWireReceiveAMQP() throws Throwable {
|
||||||
|
internalSendReceive(Protocol.OPENWIRE, Protocol.AMQP);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendCoreReceiveCore() throws Throwable {
|
||||||
|
internalSendReceive(Protocol.CORE, Protocol.CORE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendAMQPReceiveCore() throws Throwable {
|
||||||
|
internalSendReceive(Protocol.AMQP, Protocol.CORE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendOpenWireReceiveCore() throws Throwable {
|
||||||
|
internalSendReceive(Protocol.OPENWIRE, Protocol.CORE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendCoreReceiveOpenwire() throws Throwable {
|
||||||
|
internalSendReceive(Protocol.CORE, Protocol.OPENWIRE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendAMQPReceiveOpenWire() throws Throwable {
|
||||||
|
internalSendReceive(Protocol.AMQP, Protocol.OPENWIRE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSendOpenWireReceiveOpenWire() throws Throwable {
|
||||||
|
internalSendReceive(Protocol.OPENWIRE, Protocol.OPENWIRE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void internalSendReceive(Protocol protocolSender, Protocol protocolConsumer) throws Throwable {
|
||||||
|
ConnectionFactory factorySend = createFactory(protocolSender);
|
||||||
|
ConnectionFactory factoryConsume = protocolConsumer == protocolSender ? factorySend : createFactory(protocolConsumer);
|
||||||
|
|
||||||
|
long beforeSend, afterSend;
|
||||||
|
try (Connection connection = factorySend.createConnection()) {
|
||||||
|
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
|
||||||
|
javax.jms.Queue queue = session.createQueue(QUEUE.toString());
|
||||||
|
try (MessageProducer producer = session.createProducer(queue)) {
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
TextMessage msg = session.createTextMessage(getMessagePayload());
|
||||||
|
beforeSend = System.currentTimeMillis();
|
||||||
|
producer.send(msg);
|
||||||
|
afterSend = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (restart) {
|
||||||
|
server.stop();
|
||||||
|
server.start();
|
||||||
|
assertTrue(server.waitForActivation(3, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
try (Connection connection = factoryConsume.createConnection()) {
|
||||||
|
connection.start();
|
||||||
|
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
|
||||||
|
javax.jms.Queue queue = session.createQueue(QUEUE.toString());
|
||||||
|
try (MessageConsumer consumer = session.createConsumer(queue)) {
|
||||||
|
TextMessage message = (TextMessage) consumer.receive(1000);
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
Enumeration e = message.getPropertyNames();
|
||||||
|
while (e.hasMoreElements()) {
|
||||||
|
System.out.println(e.nextElement());
|
||||||
|
}
|
||||||
|
Object ingressTimestampHeader = null;
|
||||||
|
if (protocolConsumer == Protocol.AMQP) {
|
||||||
|
// Qpid JMS doesn't expose message annotations so we must use reflection here
|
||||||
|
Method getMessageAnnotation = AmqpJmsMessageFacade.class.getDeclaredMethod("getMessageAnnotation", Symbol.class);
|
||||||
|
getMessageAnnotation.setAccessible(true);
|
||||||
|
ingressTimestampHeader = getMessageAnnotation.invoke(((JmsTextMessage)message).getFacade(), Symbol.getSymbol(AMQPMessageSupport.X_OPT_INGRESS_TIME));
|
||||||
|
} else {
|
||||||
|
ingressTimestampHeader = message.getObjectProperty(Message.HDR_INGRESS_TIMESTAMP.toString());
|
||||||
|
}
|
||||||
|
assertNotNull(ingressTimestampHeader);
|
||||||
|
assertTrue(ingressTimestampHeader instanceof Long);
|
||||||
|
long ingressTimestamp = (Long) ingressTimestampHeader;
|
||||||
|
assertTrue("Ingress timstamp " + ingressTimestamp + " should be >= " + beforeSend + " and <= " + afterSend,ingressTimestamp >= beforeSend && ingressTimestamp <= afterSend);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getMessagePayload() {
|
||||||
|
StringBuilder result = new StringBuilder();
|
||||||
|
if (large) {
|
||||||
|
for (int i = 0; i < ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 20; i++) {
|
||||||
|
result.append("AB");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
result.append("AB");
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ConnectionFactory createFactory(Protocol protocol) {
|
||||||
|
switch (protocol) {
|
||||||
|
case CORE: return new ActiveMQConnectionFactory(); // core protocol
|
||||||
|
case AMQP: return new JmsConnectionFactory("amqp://localhost:61616"); // amqp
|
||||||
|
case OPENWIRE: return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); // openwire
|
||||||
|
default: return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum Protocol {
|
||||||
|
CORE, AMQP, OPENWIRE
|
||||||
|
}
|
||||||
|
}
|
|
@ -637,6 +637,27 @@ public class StompTest extends StompTestBase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIngressTimestamp() throws Exception {
|
||||||
|
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true));
|
||||||
|
conn.connect(defUser, defPass);
|
||||||
|
|
||||||
|
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
|
||||||
|
long beforeSend = System.currentTimeMillis();
|
||||||
|
sendJmsMessage(getName());
|
||||||
|
long afterSend = System.currentTimeMillis();
|
||||||
|
|
||||||
|
ClientStompFrame frame = conn.receiveFrame(10000);
|
||||||
|
Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
|
||||||
|
Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
|
||||||
|
String ingressTimestampHeader = frame.getHeader(Stomp.Headers.Message.INGRESS_TIMESTAMP);
|
||||||
|
Assert.assertNotNull(ingressTimestampHeader);
|
||||||
|
long ingressTimestamp = Long.parseLong(ingressTimestampHeader);
|
||||||
|
assertTrue("Ingress timstamp " + ingressTimestamp + " should be >= " + beforeSend + " and <= " + afterSend,ingressTimestamp >= beforeSend && ingressTimestamp <= afterSend);
|
||||||
|
|
||||||
|
conn.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAnycastDestinationTypeMessageProperty() throws Exception {
|
public void testAnycastDestinationTypeMessageProperty() throws Exception {
|
||||||
conn.connect(defUser, defPass);
|
conn.connect(defUser, defPass);
|
||||||
|
|
Loading…
Reference in New Issue