This closes #1573
This commit is contained in:
commit
e1a87ac830
|
@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
|||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.persistence.Persister;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
|
||||
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
|
||||
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
|
||||
|
@ -557,7 +558,6 @@ public class AMQPMessage extends RefCountMessage {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message setUserID(Object userID) {
|
||||
return null;
|
||||
|
@ -725,7 +725,6 @@ public class AMQPMessage extends RefCountMessage {
|
|||
return this;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public byte[] getExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
if (extraProperties == null) {
|
||||
|
@ -735,7 +734,6 @@ public class AMQPMessage extends RefCountMessage {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
if (extraProperties == null) {
|
||||
|
@ -745,8 +743,6 @@ public class AMQPMessage extends RefCountMessage {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
|
||||
getApplicationPropertiesMap().put(key, Boolean.valueOf(value));
|
||||
|
@ -904,9 +900,19 @@ public class AMQPMessage extends RefCountMessage {
|
|||
@Override
|
||||
public Object getObjectProperty(String key) {
|
||||
if (key.equals(MessageUtil.TYPE_HEADER_NAME.toString())) {
|
||||
if (getProperties() != null) {
|
||||
return getProperties().getSubject();
|
||||
}
|
||||
} else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
|
||||
return getConnectionID();
|
||||
} else if (key.equals(MessageUtil.JMSXGROUPID)) {
|
||||
return getGroupID();
|
||||
} else if (key.equals(MessageUtil.JMSXUSERID)) {
|
||||
return getAMQPUserID();
|
||||
} else if (key.equals(MessageUtil.CORRELATIONID_HEADER_NAME.toString())) {
|
||||
if (getProperties() != null && getProperties().getCorrelationId() != null) {
|
||||
return AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(getProperties().getCorrelationId());
|
||||
}
|
||||
} else {
|
||||
Object value = getApplicationPropertiesMap().get(key);
|
||||
if (value instanceof UnsignedInteger ||
|
||||
|
@ -918,6 +924,8 @@ public class AMQPMessage extends RefCountMessage {
|
|||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -16,7 +15,6 @@
|
|||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*
|
||||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.converter;
|
||||
|
||||
|
@ -28,26 +26,32 @@ import org.apache.qpid.proton.amqp.Binary;
|
|||
import org.apache.qpid.proton.amqp.UnsignedLong;
|
||||
|
||||
/**
|
||||
* Helper class for identifying and converting message-id and correlation-id values between the
|
||||
* AMQP types and the Strings values used by JMS.
|
||||
* Helper class for identifying and converting message-id and correlation-id
|
||||
* values between the AMQP types and the Strings values used by JMS.
|
||||
*
|
||||
* <p>
|
||||
* AMQP messages allow for 4 types of message-id/correlation-id: message-id-string,
|
||||
* message-id-binary, message-id-uuid, or message-id-ulong. In order to accept or return a
|
||||
* string representation of these for interoperability with other AMQP clients, the following
|
||||
* encoding can be used after removing or before adding the "ID:" prefix used for a JMSMessageID
|
||||
* AMQP messages allow for 4 types of message-id/correlation-id:
|
||||
* message-id-string, message-id-binary, message-id-uuid, or message-id-ulong.
|
||||
* In order to accept or return a string representation of these for
|
||||
* interoperability with other AMQP clients, the following encoding can be used
|
||||
* after removing or before adding the "ID:" prefix used for a JMSMessageID
|
||||
* value:<br>
|
||||
* <p>
|
||||
*
|
||||
* {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
|
||||
* {@literal "AMQP_UUID:<string representation of uuid>"}<br>
|
||||
* {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
|
||||
* {@literal "AMQP_STRING:<string>"}<br>
|
||||
*
|
||||
* <p>
|
||||
* The AMQP_STRING encoding exists only for escaping message-id-string values that happen to
|
||||
* begin with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used
|
||||
* otherwise.
|
||||
* The AMQP_STRING encoding exists only for escaping message-id-string values
|
||||
* that happen to begin with one of the encoding prefixes (including AMQP_STRING
|
||||
* itself). It MUST NOT be used otherwise.
|
||||
*
|
||||
* <p>
|
||||
* When provided a string for conversion which attempts to identify itself as an encoded binary,
|
||||
* uuid, or ulong but can't be converted into the indicated format, an exception will be thrown.
|
||||
* When provided a string for conversion which attempts to identify itself as an
|
||||
* encoded binary, uuid, or ulong but can't be converted into the indicated
|
||||
* format, an exception will be thrown.
|
||||
*
|
||||
*/
|
||||
public class AMQPMessageIdHelper {
|
||||
|
||||
|
@ -57,109 +61,213 @@ public class AMQPMessageIdHelper {
|
|||
public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
|
||||
public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
|
||||
public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
|
||||
public static final String AMQP_NO_PREFIX = "AMQP_NO_PREFIX:";
|
||||
public static final String JMS_ID_PREFIX = "ID:";
|
||||
|
||||
private static final String AMQP_PREFIX = "AMQP_";
|
||||
private static final int JMS_ID_PREFIX_LENGTH = JMS_ID_PREFIX.length();
|
||||
private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length();
|
||||
private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length();
|
||||
private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length();
|
||||
private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length();
|
||||
private static final int AMQP_NO_PREFIX_LENGTH = AMQP_NO_PREFIX.length();
|
||||
private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
|
||||
|
||||
/**
|
||||
* Takes the provided AMQP messageId style object, and convert it to a base string. Encodes
|
||||
* type information as a prefix where necessary to convey or escape the type of the provided
|
||||
* object.
|
||||
* Checks whether the given string begins with "ID:" prefix used to denote a
|
||||
* JMSMessageID
|
||||
*
|
||||
* @param messageId
|
||||
* the raw messageId object to process
|
||||
* @return the base string to be used in creating the actual id.
|
||||
* @param string
|
||||
* the string to check
|
||||
* @return true if and only id the string begins with "ID:"
|
||||
*/
|
||||
public String toBaseMessageIdString(Object messageId) {
|
||||
if (messageId == null) {
|
||||
return null;
|
||||
} else if (messageId instanceof String) {
|
||||
String stringId = (String) messageId;
|
||||
public boolean hasMessageIdPrefix(String string) {
|
||||
if (string == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If the given string has a type encoding prefix,
|
||||
// we need to escape it as an encoded string (even if
|
||||
// the existing encoding prefix was also for string)
|
||||
if (hasTypeEncodingPrefix(stringId)) {
|
||||
return AMQP_STRING_PREFIX + stringId;
|
||||
return string.startsWith(JMS_ID_PREFIX);
|
||||
}
|
||||
|
||||
public String toMessageIdString(Object idObject) {
|
||||
if (idObject instanceof String) {
|
||||
final String stringId = (String) idObject;
|
||||
|
||||
boolean hasMessageIdPrefix = hasMessageIdPrefix(stringId);
|
||||
if (!hasMessageIdPrefix) {
|
||||
// For JMSMessageID, has no "ID:" prefix, we need to record
|
||||
// that for later use as a JMSCorrelationID.
|
||||
return JMS_ID_PREFIX + AMQP_NO_PREFIX + stringId;
|
||||
} else if (hasTypeEncodingPrefix(stringId, JMS_ID_PREFIX_LENGTH)) {
|
||||
// We are for a JMSMessageID value, but have 'ID:' followed by
|
||||
// one of the encoding prefixes. Need to escape the entire string
|
||||
// to preserve for later re-use as a JMSCorrelationID.
|
||||
return JMS_ID_PREFIX + AMQP_STRING_PREFIX + stringId;
|
||||
} else {
|
||||
// It has "ID:" prefix and doesn't have encoding prefix, use it as-is.
|
||||
return stringId;
|
||||
}
|
||||
} else if (messageId instanceof UUID) {
|
||||
return AMQP_UUID_PREFIX + messageId.toString();
|
||||
} else if (messageId instanceof UnsignedLong) {
|
||||
return AMQP_ULONG_PREFIX + messageId.toString();
|
||||
} else if (messageId instanceof Binary) {
|
||||
ByteBuffer dup = ((Binary) messageId).asByteBuffer();
|
||||
} else {
|
||||
// Not a string, convert it
|
||||
return convertToIdString(idObject);
|
||||
}
|
||||
}
|
||||
|
||||
public String toCorrelationIdString(Object idObject) {
|
||||
if (idObject instanceof String) {
|
||||
final String stringId = (String) idObject;
|
||||
|
||||
boolean hasMessageIdPrefix = hasMessageIdPrefix(stringId);
|
||||
if (!hasMessageIdPrefix) {
|
||||
// For JMSCorrelationID, has no "ID:" prefix, use it as-is.
|
||||
return stringId;
|
||||
} else if (hasTypeEncodingPrefix(stringId, JMS_ID_PREFIX_LENGTH)) {
|
||||
// We are for a JMSCorrelationID value, but have 'ID:' followed by
|
||||
// one of the encoding prefixes. Need to escape the entire string
|
||||
// to preserve for later re-use as a JMSCorrelationID.
|
||||
return JMS_ID_PREFIX + AMQP_STRING_PREFIX + stringId;
|
||||
} else {
|
||||
// It has "ID:" prefix and doesn't have encoding prefix, use it as-is.
|
||||
return stringId;
|
||||
}
|
||||
} else {
|
||||
// Not a string, convert it
|
||||
return convertToIdString(idObject);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes the provided non-String AMQP message-id/correlation-id object, and
|
||||
* convert it it to a String usable as either a JMSMessageID or
|
||||
* JMSCorrelationID value, encoding the type information as a prefix to
|
||||
* convey for later use in reversing the process if used to set
|
||||
* JMSCorrelationID on a message.
|
||||
*
|
||||
* @param idObject
|
||||
* the object to process
|
||||
* @return string to be used for the actual JMS ID.
|
||||
*/
|
||||
private String convertToIdString(Object idObject) {
|
||||
if (idObject == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (idObject instanceof UUID) {
|
||||
return JMS_ID_PREFIX + AMQP_UUID_PREFIX + idObject.toString();
|
||||
} else if (idObject instanceof UnsignedLong) {
|
||||
return JMS_ID_PREFIX + AMQP_ULONG_PREFIX + idObject.toString();
|
||||
} else if (idObject instanceof Binary) {
|
||||
ByteBuffer dup = ((Binary) idObject).asByteBuffer();
|
||||
|
||||
byte[] bytes = new byte[dup.remaining()];
|
||||
dup.get(bytes);
|
||||
|
||||
String hex = convertBinaryToHexString(bytes);
|
||||
|
||||
return AMQP_BINARY_PREFIX + hex;
|
||||
return JMS_ID_PREFIX + AMQP_BINARY_PREFIX + hex;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass());
|
||||
throw new IllegalArgumentException("Unsupported type provided: " + idObject.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean hasTypeEncodingPrefix(String stringId, int offset) {
|
||||
if (!stringId.startsWith(AMQP_PREFIX, offset)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return hasAmqpBinaryPrefix(stringId, offset) || hasAmqpUuidPrefix(stringId, offset) || hasAmqpUlongPrefix(stringId, offset)
|
||||
|| hasAmqpStringPrefix(stringId, offset) || hasAmqpNoPrefix(stringId, offset);
|
||||
}
|
||||
|
||||
private boolean hasAmqpStringPrefix(String stringId, int offset) {
|
||||
return stringId.startsWith(AMQP_STRING_PREFIX, offset);
|
||||
}
|
||||
|
||||
private boolean hasAmqpUlongPrefix(String stringId, int offset) {
|
||||
return stringId.startsWith(AMQP_ULONG_PREFIX, offset);
|
||||
}
|
||||
|
||||
private boolean hasAmqpUuidPrefix(String stringId, int offset) {
|
||||
return stringId.startsWith(AMQP_UUID_PREFIX, offset);
|
||||
}
|
||||
|
||||
private boolean hasAmqpBinaryPrefix(String stringId, int offset) {
|
||||
return stringId.startsWith(AMQP_BINARY_PREFIX, offset);
|
||||
}
|
||||
|
||||
private boolean hasAmqpNoPrefix(String stringId, int offset) {
|
||||
return stringId.startsWith(AMQP_NO_PREFIX, offset);
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes the provided base id string and return the appropriate amqp messageId style object.
|
||||
* Converts the type based on any relevant encoding information found as a prefix.
|
||||
* Takes the provided id string and return the appropriate amqp messageId
|
||||
* style object. Converts the type based on any relevant encoding information
|
||||
* found as a prefix.
|
||||
*
|
||||
* @param baseId
|
||||
* the object to be converted to an AMQP MessageId value.
|
||||
* @param origId
|
||||
* the object to be converted
|
||||
* @return the AMQP messageId style object
|
||||
* @throws ActiveMQAMQPIllegalStateException
|
||||
* if the provided baseId String indicates an encoded type but can't be converted to
|
||||
* that type.
|
||||
*
|
||||
* @throws IllegalArgument
|
||||
* if the provided baseId String indicates an encoded type but can't
|
||||
* be converted to that type.
|
||||
*/
|
||||
public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException {
|
||||
if (baseId == null) {
|
||||
public Object toIdObject(final String origId) throws ActiveMQAMQPIllegalStateException {
|
||||
if (origId == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!AMQPMessageIdHelper.INSTANCE.hasMessageIdPrefix(origId)) {
|
||||
// We have a string without any "ID:" prefix, it is an
|
||||
// application-specific String, use it as-is.
|
||||
return origId;
|
||||
}
|
||||
|
||||
try {
|
||||
if (hasAmqpUuidPrefix(baseId)) {
|
||||
String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH);
|
||||
if (hasAmqpNoPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
|
||||
// Prefix telling us there was originally no "ID:" prefix,
|
||||
// strip it and return the remainder
|
||||
return origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_NO_PREFIX_LENGTH);
|
||||
} else if (hasAmqpUuidPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
|
||||
String uuidString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_UUID_PREFIX_LENGTH);
|
||||
return UUID.fromString(uuidString);
|
||||
} else if (hasAmqpUlongPrefix(baseId)) {
|
||||
String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH);
|
||||
return UnsignedLong.valueOf(longString);
|
||||
} else if (hasAmqpStringPrefix(baseId)) {
|
||||
return strip(baseId, AMQP_STRING_PREFIX_LENGTH);
|
||||
} else if (hasAmqpBinaryPrefix(baseId)) {
|
||||
String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH);
|
||||
} else if (hasAmqpUlongPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
|
||||
String ulongString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_ULONG_PREFIX_LENGTH);
|
||||
return UnsignedLong.valueOf(ulongString);
|
||||
} else if (hasAmqpStringPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
|
||||
return origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_STRING_PREFIX_LENGTH);
|
||||
} else if (hasAmqpBinaryPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
|
||||
String hexString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_BINARY_PREFIX_LENGTH);
|
||||
byte[] bytes = convertHexStringToBinary(hexString);
|
||||
return new Binary(bytes);
|
||||
} else {
|
||||
// We have a string without any type prefix, transmit it as-is.
|
||||
return baseId;
|
||||
// We have a string without any encoding prefix needing processed,
|
||||
// so transmit it as-is, including the "ID:"
|
||||
return origId;
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value");
|
||||
} catch (IllegalArgumentException iae) {
|
||||
throw new ActiveMQAMQPIllegalStateException(iae.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the provided hex-string into a binary representation where each byte represents
|
||||
* two characters of the hex string.
|
||||
* <p>
|
||||
* Convert the provided hex-string into a binary representation where each
|
||||
* byte represents two characters of the hex string.
|
||||
*
|
||||
* The hex characters may be upper or lower case.
|
||||
*
|
||||
* @param hexString
|
||||
* string to convert to a binary value.
|
||||
* string to convert
|
||||
* @return a byte array containing the binary representation
|
||||
* @throws IllegalArgumentException
|
||||
* if the provided String is a non-even length or contains non-hex characters
|
||||
* if the provided String is a non-even length or contains non-hex
|
||||
* characters
|
||||
*/
|
||||
public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException {
|
||||
int length = hexString.length();
|
||||
|
||||
// As each byte needs two characters in the hex encoding, the string must be an even
|
||||
// length.
|
||||
// As each byte needs two characters in the hex encoding, the string must
|
||||
// be an even length.
|
||||
if (length % 2 != 0) {
|
||||
throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString);
|
||||
}
|
||||
|
@ -179,14 +287,32 @@ public class AMQPMessageIdHelper {
|
|||
return binary;
|
||||
}
|
||||
|
||||
private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
|
||||
if (ch >= '0' && ch <= '9') {
|
||||
// subtract '0' to get difference in position as an int
|
||||
return ch - '0';
|
||||
} else if (ch >= 'A' && ch <= 'F') {
|
||||
// subtract 'A' to get difference in position as an int
|
||||
// and then add 10 for the offset of 'A'
|
||||
return ch - 'A' + 10;
|
||||
} else if (ch >= 'a' && ch <= 'f') {
|
||||
// subtract 'a' to get difference in position as an int
|
||||
// and then add 10 for the offset of 'a'
|
||||
return ch - 'a' + 10;
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the provided binary into a hex-string representation where each character
|
||||
* represents 4 bits of the provided binary, i.e each byte requires two characters.
|
||||
* <p>
|
||||
* Convert the provided binary into a hex-string representation where each
|
||||
* character represents 4 bits of the provided binary, i.e each byte requires
|
||||
* two characters.
|
||||
*
|
||||
* The returned hex characters are upper-case.
|
||||
*
|
||||
* @param bytes
|
||||
* the binary value to convert to a hex String instance.
|
||||
* binary to convert
|
||||
* @return a String containing a hex representation of the bytes
|
||||
*/
|
||||
public String convertBinaryToHexString(byte[] bytes) {
|
||||
|
@ -206,47 +332,4 @@ public class AMQPMessageIdHelper {
|
|||
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
// ----- Internal implementation ------------------------------------------//
|
||||
|
||||
private boolean hasTypeEncodingPrefix(String stringId) {
|
||||
return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) || hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId);
|
||||
}
|
||||
|
||||
private boolean hasAmqpStringPrefix(String stringId) {
|
||||
return stringId.startsWith(AMQP_STRING_PREFIX);
|
||||
}
|
||||
|
||||
private boolean hasAmqpUlongPrefix(String stringId) {
|
||||
return stringId.startsWith(AMQP_ULONG_PREFIX);
|
||||
}
|
||||
|
||||
private boolean hasAmqpUuidPrefix(String stringId) {
|
||||
return stringId.startsWith(AMQP_UUID_PREFIX);
|
||||
}
|
||||
|
||||
private boolean hasAmqpBinaryPrefix(String stringId) {
|
||||
return stringId.startsWith(AMQP_BINARY_PREFIX);
|
||||
}
|
||||
|
||||
private String strip(String id, int numChars) {
|
||||
return id.substring(numChars);
|
||||
}
|
||||
|
||||
private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
|
||||
if (ch >= '0' && ch <= '9') {
|
||||
// subtract '0' to get difference in position as an int
|
||||
return ch - '0';
|
||||
} else if (ch >= 'A' && ch <= 'F') {
|
||||
// subtract 'A' to get difference in position as an int
|
||||
// and then add 10 for the offset of 'A'
|
||||
return ch - 'A' + 10;
|
||||
} else if (ch >= 'a' && ch <= 'f') {
|
||||
// subtract 'a' to get difference in position as an int
|
||||
// and then add 10 for the offset of 'a'
|
||||
return ch - 'a' + 10;
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,27 @@
|
|||
|
||||
package org.apache.activemq.artemis.protocol.amqp.converter;
|
||||
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createBytesMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMapMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createObjectMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createStreamMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createTextMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.isContentType;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.CharBuffer;
|
||||
import java.nio.charset.CharacterCodingException;
|
||||
|
@ -28,8 +47,9 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
|
||||
|
@ -59,26 +79,8 @@ import org.apache.qpid.proton.amqp.messaging.Properties;
|
|||
import org.apache.qpid.proton.amqp.messaging.Section;
|
||||
import org.apache.qpid.proton.codec.WritableBuffer;
|
||||
|
||||
import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createBytesMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMapMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createObjectMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createStreamMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createTextMessage;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent;
|
||||
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.isContentType;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
|
||||
/**
|
||||
* This class was created just to separate concerns on AMQPConverter.
|
||||
|
@ -86,6 +88,7 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
|
|||
* */
|
||||
public class AmqpCoreConverter {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static ICoreMessage toCore(AMQPMessage message) throws Exception {
|
||||
|
||||
Section body = message.getProtonMessage().getBody();
|
||||
|
@ -189,6 +192,7 @@ public class AmqpCoreConverter {
|
|||
return result != null ? result.getInnerMessage() : null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
|
||||
Header header = amqp.getHeader();
|
||||
if (header != null) {
|
||||
|
@ -250,11 +254,10 @@ public class AmqpCoreConverter {
|
|||
final Properties properties = amqp.getProperties();
|
||||
if (properties != null) {
|
||||
if (properties.getMessageId() != null) {
|
||||
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
|
||||
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
|
||||
}
|
||||
Binary userId = properties.getUserId();
|
||||
if (userId != null) {
|
||||
// TODO - Better Way to set this?
|
||||
jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
|
||||
}
|
||||
if (properties.getTo() != null) {
|
||||
|
@ -267,7 +270,7 @@ public class AmqpCoreConverter {
|
|||
jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
|
||||
}
|
||||
if (properties.getCorrelationId() != null) {
|
||||
jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
|
||||
jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId()));
|
||||
}
|
||||
if (properties.getContentType() != null) {
|
||||
jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString());
|
||||
|
@ -357,6 +360,4 @@ public class AmqpCoreConverter {
|
|||
msg.setObjectProperty(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -16,177 +15,542 @@
|
|||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*
|
||||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.converter.message;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
|
||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
|
||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.UnsignedLong;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class AMQPMessageIdHelperTest {
|
||||
|
||||
private AMQPMessageIdHelper messageIdHelper;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
messageIdHelper = new AMQPMessageIdHelper();
|
||||
messageIdHelper = AMQPMessageIdHelper.INSTANCE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns null if given
|
||||
* null
|
||||
* Test that {@link AMQPMessageIdHelper#hasMessageIdPrefix(String)} returns
|
||||
* true for strings that begin "ID:"
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithNull() {
|
||||
public void testHasIdPrefixWithPrefix() {
|
||||
String myId = "ID:something";
|
||||
assertTrue("'ID:' prefix should have been identified", messageIdHelper.hasMessageIdPrefix(myId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#hasMessageIdPrefix(String)} returns
|
||||
* false for string beings "ID" without colon.
|
||||
*/
|
||||
@Test
|
||||
public void testHasIdPrefixWithIDButNoColonPrefix() {
|
||||
String myIdNoColon = "IDsomething";
|
||||
assertFalse("'ID' prefix should not have been identified without trailing colon", messageIdHelper.hasMessageIdPrefix(myIdNoColon));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#hasMessageIdPrefix(String)} returns
|
||||
* false for null
|
||||
*/
|
||||
@Test
|
||||
public void testHasIdPrefixWithNull() {
|
||||
String nullString = null;
|
||||
assertNull("null string should have been returned", messageIdHelper.toBaseMessageIdString(nullString));
|
||||
assertFalse("null string should not result in identification as having the prefix", messageIdHelper.hasMessageIdPrefix(nullString));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} throws an IAE if given
|
||||
* an unexpected object type.
|
||||
* Test that {@link AMQPMessageIdHelper#hasMessageIdPrefix(String)} returns
|
||||
* false for strings that doesnt have "ID:" anywhere
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringThrowsIAEWithUnexpectedType() {
|
||||
public void testHasIdPrefixWithoutPrefix() {
|
||||
String myNonId = "something";
|
||||
assertFalse("string without 'ID:' anywhere should not have been identified as having the prefix", messageIdHelper.hasMessageIdPrefix(myNonId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#hasMessageIdPrefix(String)} returns
|
||||
* false for strings has lowercase "id:" prefix
|
||||
*/
|
||||
@Test
|
||||
public void testHasIdPrefixWithLowercaseID() {
|
||||
String myLowerCaseNonId = "id:something";
|
||||
assertFalse("lowercase 'id:' prefix should not result in identification as having 'ID:' prefix", messageIdHelper.hasMessageIdPrefix(myLowerCaseNonId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns
|
||||
* null if given null
|
||||
*/
|
||||
@Test
|
||||
public void testToMessageIdStringWithNull() {
|
||||
assertNull("null string should have been returned", messageIdHelper.toMessageIdString(null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} throws an
|
||||
* IAE if given an unexpected object type.
|
||||
*/
|
||||
@Test
|
||||
public void testToMessageIdStringThrowsIAEWithUnexpectedType() {
|
||||
try {
|
||||
messageIdHelper.toBaseMessageIdString(new Object());
|
||||
messageIdHelper.toMessageIdString(new Object());
|
||||
fail("expected exception not thrown");
|
||||
} catch (IllegalArgumentException iae) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns the given
|
||||
* basic string unchanged
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithString() {
|
||||
String stringMessageId = "myIdString";
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(stringMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", stringMessageId, baseMessageIdString);
|
||||
private void doToMessageIdTestImpl(Object idObject, String expected) {
|
||||
String idString = messageIdHelper.toMessageIdString(idObject);
|
||||
assertNotNull("null string should not have been returned", idString);
|
||||
assertEquals("expected id string was not returned", expected, idString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
|
||||
* indicating an AMQP encoded string, when the given string happens to already begin with the
|
||||
* {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns
|
||||
* the given basic "ID:content" string unchanged.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForUUID() {
|
||||
public void testToMessageIdStringWithString() {
|
||||
String stringId = "ID:myIdString";
|
||||
|
||||
doToMessageIdTestImpl(stringId, stringId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns
|
||||
* the given basic string with the 'no prefix' prefix and "ID:" prefix.
|
||||
*/
|
||||
@Test
|
||||
public void testToMessageIdStringWithStringNoPrefix() {
|
||||
String stringId = "myIdStringNoPrefix";
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + stringId;
|
||||
|
||||
doToMessageIdTestImpl(stringId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating lack of "ID:" prefix, when the given string happens to
|
||||
* begin with the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToMessageIdStringWithStringBeginningWithEncodingPrefixForUUID() {
|
||||
String uuidStringMessageId = AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID();
|
||||
String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + uuidStringMessageId;
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + uuidStringMessageId;
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(uuidStringMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
doToMessageIdTestImpl(uuidStringMessageId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
|
||||
* indicating an AMQP encoded string, when the given string happens to already begin with the
|
||||
* {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating lack of "ID:" prefix, when the given string happens to
|
||||
* begin with the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForLong() {
|
||||
public void testToMessageIdStringWithStringBeginningWithEncodingPrefixForLong() {
|
||||
String longStringMessageId = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + Long.valueOf(123456789L);
|
||||
String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + longStringMessageId;
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + longStringMessageId;
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(longStringMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
doToMessageIdTestImpl(longStringMessageId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
|
||||
* indicating an AMQP encoded string, when the given string happens to already begin with the
|
||||
* {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating lack of "ID:" prefix, when the given string happens to
|
||||
* begin with the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForBinary() {
|
||||
public void testToMessageIdStringWithStringBeginningWithEncodingPrefixForBinary() {
|
||||
String binaryStringMessageId = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "0123456789ABCDEF";
|
||||
String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + binaryStringMessageId;
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + binaryStringMessageId;
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(binaryStringMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
doToMessageIdTestImpl(binaryStringMessageId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
|
||||
* indicating an AMQP encoded string (effectively twice), when the given string happens to
|
||||
* already begin with the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating lack of "ID:" prefix, when the given string happens to
|
||||
* begin with the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForString() {
|
||||
public void testToMessageIdStringWithStringBeginningWithEncodingPrefixForString() {
|
||||
String stringMessageId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + "myStringId";
|
||||
String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + stringMessageId;
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + stringMessageId;
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(stringMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
doToMessageIdTestImpl(stringMessageId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
|
||||
* indicating an AMQP encoded UUID when given a UUID object.
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating lack of "ID:" prefix, effectively twice, when the given
|
||||
* string happens to begin with the
|
||||
* {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithUUID() {
|
||||
public void testToMessageIdStringWithStringBeginningWithEncodingPrefixForNoIdPrefix() {
|
||||
String stringMessageId = AMQPMessageIdHelper.AMQP_NO_PREFIX + "myStringId";
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + stringMessageId;
|
||||
|
||||
doToMessageIdTestImpl(stringMessageId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating an AMQP encoded UUID when given a UUID object.
|
||||
*/
|
||||
@Test
|
||||
public void testToMessageIdStringWithUUID() {
|
||||
UUID uuidMessageId = UUID.randomUUID();
|
||||
String expected = AMQPMessageIdHelper.AMQP_UUID_PREFIX + uuidMessageId.toString();
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_UUID_PREFIX + uuidMessageId.toString();
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(uuidMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
doToMessageIdTestImpl(uuidMessageId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
|
||||
* indicating an AMQP encoded ulong when given a UnsignedLong object.
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating an AMQP encoded ulong when given a UnsignedLong object.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithUnsignedLong() {
|
||||
public void testToMessageIdStringWithUnsignedLong() {
|
||||
UnsignedLong uLongMessageId = UnsignedLong.valueOf(123456789L);
|
||||
String expected = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + uLongMessageId.toString();
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_ULONG_PREFIX + uLongMessageId.toString();
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(uLongMessageId);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
doToMessageIdTestImpl(uLongMessageId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
|
||||
* indicating an AMQP encoded binary when given a Binary object.
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating an AMQP encoded binary when given a Binary object.
|
||||
*/
|
||||
@Test
|
||||
public void testToBaseMessageIdStringWithBinary() {
|
||||
public void testToMessageIdStringWithBinary() {
|
||||
byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF};
|
||||
Binary binary = new Binary(bytes);
|
||||
|
||||
String expected = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
|
||||
|
||||
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(binary);
|
||||
assertNotNull("null string should not have been returned", baseMessageIdString);
|
||||
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
|
||||
doToMessageIdTestImpl(binary, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns an UnsignedLong when
|
||||
* given a string indicating an encoded AMQP ulong id.
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating an escaped string, when given an input string that
|
||||
* already has the "ID:" prefix, but follows it with an encoding prefix, in
|
||||
* this case the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToMessageIdStringWithStringBeginningWithIdAndEncodingPrefixForString() {
|
||||
String unescapedStringPrefixMessageId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + "id-content";
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedStringPrefixMessageId;
|
||||
|
||||
doToMessageIdTestImpl(unescapedStringPrefixMessageId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating an escaped string, when given an input string that
|
||||
* already has the "ID:" prefix, but follows it with an encoding prefix, in
|
||||
* this case the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToMessageIdStringWithStringBeginningWithIdAndEncodingPrefixForUUID() {
|
||||
String unescapedUuidPrefixMessageId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID();
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedUuidPrefixMessageId;
|
||||
|
||||
doToMessageIdTestImpl(unescapedUuidPrefixMessageId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating an escaped string, when given an input string that
|
||||
* already has the "ID:" prefix, but follows it with an encoding prefix, in
|
||||
* this case the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToMessageIdStringWithStringBeginningWithIdAndEncodingPrefixForUlong() {
|
||||
String unescapedUlongPrefixMessageId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_ULONG_PREFIX + "42";
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedUlongPrefixMessageId;
|
||||
|
||||
doToMessageIdTestImpl(unescapedUlongPrefixMessageId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating an escaped string, when given an input string that
|
||||
* already has the "ID:" prefix, but follows it with an encoding prefix, in
|
||||
* this case the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToMessageIdStringWithStringBeginningWithIdAndEncodingPrefixForBinary() {
|
||||
String unescapedBinaryPrefixMessageId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "ABCDEF";
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedBinaryPrefixMessageId;
|
||||
|
||||
doToMessageIdTestImpl(unescapedBinaryPrefixMessageId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
|
||||
* string indicating an escaped string, when given an input string that
|
||||
* already has the "ID:" prefix, but follows it with an encoding prefix, in
|
||||
* this case the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToMessageIdStringWithStringBeginningWithIdAndEncodingPrefixForNoIDPrefix() {
|
||||
String unescapedNoPrefixPrefixedMessageId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + "id-content";
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedNoPrefixPrefixedMessageId;
|
||||
|
||||
doToMessageIdTestImpl(unescapedNoPrefixPrefixedMessageId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns null if given null
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithNull() {
|
||||
assertNull("null string should have been returned", messageIdHelper.toCorrelationIdString(null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} throws
|
||||
* an IAE if given an unexpected object type.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringThrowsIAEWithUnexpectedType() {
|
||||
try {
|
||||
messageIdHelper.toCorrelationIdString(new Object());
|
||||
fail("expected exception not thrown");
|
||||
} catch (IllegalArgumentException iae) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
private void doToCorrelationIDTestImpl(Object idObject, String expected) {
|
||||
String idString = messageIdHelper.toCorrelationIdString(idObject);
|
||||
assertNotNull("null string should not have been returned", idString);
|
||||
assertEquals("expected id string was not returned", expected, idString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns the given basic string unchanged when it has the "ID:" prefix (but
|
||||
* no others).
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithString() {
|
||||
String stringId = "ID:myCorrelationIdString";
|
||||
|
||||
doToCorrelationIDTestImpl(stringId, stringId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns the given basic string unchanged when it lacks the "ID:" prefix
|
||||
* (and any others)
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithStringNoPrefix() {
|
||||
String stringNoId = "myCorrelationIdString";
|
||||
|
||||
doToCorrelationIDTestImpl(stringNoId, stringNoId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string unchanged when it lacks the "ID:" prefix but happens to
|
||||
* already begin with the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithStringBeginningWithEncodingPrefixForUUID() {
|
||||
String uuidPrefixStringCorrelationId = AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID();
|
||||
|
||||
doToCorrelationIDTestImpl(uuidPrefixStringCorrelationId, uuidPrefixStringCorrelationId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string unchanged when it lacks the "ID:" prefix but happens to
|
||||
* already begin with the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithStringBeginningWithEncodingPrefixForLong() {
|
||||
String ulongPrefixStringCorrelationId = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + Long.valueOf(123456789L);
|
||||
|
||||
doToCorrelationIDTestImpl(ulongPrefixStringCorrelationId, ulongPrefixStringCorrelationId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string unchanged when it lacks the "ID:" prefix but happens to
|
||||
* already begin with the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithStringBeginningWithEncodingPrefixForBinary() {
|
||||
String binaryPrefixStringCorrelationId = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "0123456789ABCDEF";
|
||||
|
||||
doToCorrelationIDTestImpl(binaryPrefixStringCorrelationId, binaryPrefixStringCorrelationId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string unchanged when it lacks the "ID:" prefix but happens to
|
||||
* already begin with the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithStringBeginningWithEncodingPrefixForString() {
|
||||
String stringPrefixCorrelationId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + "myStringId";
|
||||
|
||||
doToCorrelationIDTestImpl(stringPrefixCorrelationId, stringPrefixCorrelationId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string unchanged when it lacks the "ID:" prefix but happens to
|
||||
* already begin with the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithStringBeginningWithEncodingPrefixForNoIdPrefix() {
|
||||
String noPrefixStringCorrelationId = AMQPMessageIdHelper.AMQP_NO_PREFIX + "myStringId";
|
||||
|
||||
doToCorrelationIDTestImpl(noPrefixStringCorrelationId, noPrefixStringCorrelationId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string indicating an AMQP encoded UUID when given a UUID object.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithUUID() {
|
||||
UUID uuidCorrelationId = UUID.randomUUID();
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_UUID_PREFIX + uuidCorrelationId.toString();
|
||||
|
||||
doToCorrelationIDTestImpl(uuidCorrelationId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string indicating an AMQP encoded ulong when given a
|
||||
* UnsignedLong object.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithUnsignedLong() {
|
||||
UnsignedLong uLongCorrelationId = UnsignedLong.valueOf(123456789L);
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_ULONG_PREFIX + uLongCorrelationId.toString();
|
||||
|
||||
doToCorrelationIDTestImpl(uLongCorrelationId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string indicating an AMQP encoded binary when given a Binary
|
||||
* object.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithBinary() {
|
||||
byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF};
|
||||
Binary binary = new Binary(bytes);
|
||||
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
|
||||
|
||||
doToCorrelationIDTestImpl(binary, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string indicating an escaped string, when given an input string
|
||||
* that already has the "ID:" prefix, but follows it with an encoding prefix,
|
||||
* in this case the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithStringBeginningWithIdAndEncodingPrefixForString() {
|
||||
String unescapedStringPrefixCorrelationId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + "id-content";
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedStringPrefixCorrelationId;
|
||||
|
||||
doToCorrelationIDTestImpl(unescapedStringPrefixCorrelationId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string indicating an escaped string, when given an input string
|
||||
* that already has the "ID:" prefix, but follows it with an encoding prefix,
|
||||
* in this case the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithStringBeginningWithIdAndEncodingPrefixForUUID() {
|
||||
String unescapedUuidPrefixCorrelationId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID();
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedUuidPrefixCorrelationId;
|
||||
|
||||
doToCorrelationIDTestImpl(unescapedUuidPrefixCorrelationId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string indicating an escaped string, when given an input string
|
||||
* that already has the "ID:" prefix, but follows it with an encoding prefix,
|
||||
* in this case the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithStringBeginningWithIdAndEncodingPrefixForUlong() {
|
||||
String unescapedUlongPrefixCorrelationId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_ULONG_PREFIX + "42";
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedUlongPrefixCorrelationId;
|
||||
|
||||
doToCorrelationIDTestImpl(unescapedUlongPrefixCorrelationId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string indicating an escaped string, when given an input string
|
||||
* that already has the "ID:" prefix, but follows it with an encoding prefix,
|
||||
* in this case the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithStringBeginningWithIdAndEncodingPrefixForBinary() {
|
||||
String unescapedBinaryPrefixCorrelationId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "ABCDEF";
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedBinaryPrefixCorrelationId;
|
||||
|
||||
doToCorrelationIDTestImpl(unescapedBinaryPrefixCorrelationId, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
|
||||
* returns a string indicating an escaped string, when given an input string
|
||||
* that already has the "ID:" prefix, but follows it with an encoding prefix,
|
||||
* in this case the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
|
||||
*/
|
||||
@Test
|
||||
public void testToCorrelationIdStringWithStringBeginningWithIdAndEncodingPrefixForNoIDPrefix() {
|
||||
String unescapedNoPrefixCorrelationId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + "id-content";
|
||||
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedNoPrefixCorrelationId;
|
||||
|
||||
doToCorrelationIDTestImpl(unescapedNoPrefixCorrelationId, expected);
|
||||
}
|
||||
|
||||
private void doToIdObjectTestImpl(String idString, Object expected) throws ActiveMQAMQPIllegalStateException {
|
||||
Object idObject = messageIdHelper.toIdObject(idString);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", expected, idObject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns an
|
||||
* UnsignedLong when given a string indicating an encoded AMQP ulong id.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
|
@ -194,16 +558,15 @@ public class AMQPMessageIdHelperTest {
|
|||
@Test
|
||||
public void testToIdObjectWithEncodedUlong() throws Exception {
|
||||
UnsignedLong longId = UnsignedLong.valueOf(123456789L);
|
||||
String provided = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + "123456789";
|
||||
String provided = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_ULONG_PREFIX + "123456789";
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(provided);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", longId, idObject);
|
||||
doToIdObjectTestImpl(provided, longId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a Binary when given a
|
||||
* string indicating an encoded AMQP binary id, using upper case hex characters
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a Binary
|
||||
* when given a string indicating an encoded AMQP binary id, using upper case
|
||||
* hex characters
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
|
@ -213,15 +576,14 @@ public class AMQPMessageIdHelperTest {
|
|||
byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF};
|
||||
Binary binaryId = new Binary(bytes);
|
||||
|
||||
String provided = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
|
||||
String provided = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(provided);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", binaryId, idObject);
|
||||
doToIdObjectTestImpl(provided, binaryId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns null when given null.
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns null when
|
||||
* given null.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
|
@ -232,8 +594,9 @@ public class AMQPMessageIdHelperTest {
|
|||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a Binary when given a
|
||||
* string indicating an encoded AMQP binary id, using lower case hex characters.
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a Binary
|
||||
* when given a string indicating an encoded AMQP binary id, using lower case
|
||||
* hex characters.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
|
@ -243,16 +606,14 @@ public class AMQPMessageIdHelperTest {
|
|||
byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF};
|
||||
Binary binaryId = new Binary(bytes);
|
||||
|
||||
String provided = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00ab09ff";
|
||||
String provided = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00ab09ff";
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(provided);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", binaryId, idObject);
|
||||
doToIdObjectTestImpl(provided, binaryId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a UUID when given a
|
||||
* string indicating an encoded AMQP uuid id.
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a UUID
|
||||
* when given a string indicating an encoded AMQP uuid id.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
|
@ -260,133 +621,170 @@ public class AMQPMessageIdHelperTest {
|
|||
@Test
|
||||
public void testToIdObjectWithEncodedUuid() throws Exception {
|
||||
UUID uuid = UUID.randomUUID();
|
||||
String provided = AMQPMessageIdHelper.AMQP_UUID_PREFIX + uuid.toString();
|
||||
String provided = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_UUID_PREFIX + uuid.toString();
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(provided);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", uuid, idObject);
|
||||
doToIdObjectTestImpl(provided, uuid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a string when given a
|
||||
* string without any type encoding prefix.
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a string
|
||||
* unchanged when given a string without any prefix.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithStringContainingNoEncodingPrefix() throws Exception {
|
||||
public void testToIdObjectWithAppSpecificString() throws Exception {
|
||||
String stringId = "myStringId";
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(stringId);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", stringId, idObject);
|
||||
doToIdObjectTestImpl(stringId, stringId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns the remainder of the
|
||||
* provided string after removing the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix.
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a string
|
||||
* unchanged when given a string with only the 'ID:' prefix.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithStringContainingStringEncodingPrefix() throws Exception {
|
||||
public void testToIdObjectWithSimplIdString() throws Exception {
|
||||
String stringId = "ID:myStringId";
|
||||
|
||||
doToIdObjectTestImpl(stringId, stringId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns the
|
||||
* remainder of the provided string after removing the 'ID:' and
|
||||
* {@link AMQPMessageIdHelper#AMQP_NO_PREFIX} prefix used to indicate it
|
||||
* originally had no 'ID:' prefix [when arriving as a message id].
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithStringContainingEncodingPrefixForNoIdPrefix() throws Exception {
|
||||
String suffix = "myStringSuffix";
|
||||
String stringId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + suffix;
|
||||
String stringId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + suffix;
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(stringId);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", suffix, idObject);
|
||||
doToIdObjectTestImpl(stringId, suffix);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that when given a string with with the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}
|
||||
* prefix and then additionally the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}, the
|
||||
* {@link AMQPMessageIdHelper#toIdObject(String)} method returns the remainder of the
|
||||
* provided string after removing the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix.
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns the
|
||||
* remainder of the provided string after removing the
|
||||
* {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithStringContainingStringEncodingPrefixAndThenUuidPrefix() throws Exception {
|
||||
String encodedUuidString = AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID().toString();
|
||||
String stringId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + encodedUuidString;
|
||||
public void testToIdObjectWithStringContainingIdStringEncodingPrefix() throws Exception {
|
||||
String suffix = "myStringSuffix";
|
||||
String stringId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + suffix;
|
||||
|
||||
Object idObject = messageIdHelper.toIdObject(stringId);
|
||||
assertNotNull("null object should not have been returned", idObject);
|
||||
assertEquals("expected id object was not returned", encodedUuidString, idObject);
|
||||
doToIdObjectTestImpl(stringId, suffix);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that when given a string with with the
|
||||
* {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix and then
|
||||
* additionally the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}, the
|
||||
* {@link AMQPMessageIdHelper#toIdObject(String)} method returns the
|
||||
* remainder of the provided string after removing the
|
||||
* {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix.
|
||||
*
|
||||
* @throws Exception
|
||||
* if an error occurs during the test.
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithStringContainingIdStringEncodingPrefixAndThenUuidPrefix() throws Exception {
|
||||
String encodedUuidString = AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID().toString();
|
||||
String stringId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + encodedUuidString;
|
||||
|
||||
doToIdObjectTestImpl(stringId, encodedUuidString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} throws an
|
||||
* {@link IdConversionException} when presented with an encoded binary hex string of uneven
|
||||
* length (after the prefix) that thus can't be converted due to each byte using 2 characters
|
||||
* {@link IdConversionException} when presented with an encoded binary hex
|
||||
* string of uneven length (after the prefix) that thus can't be converted
|
||||
* due to each byte using 2 characters
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithStringContainingBinaryHexThrowsWithUnevenLengthString() {
|
||||
String unevenHead = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "123";
|
||||
public void testToIdObjectWithStringContainingBinaryHexThrowsICEWithUnevenLengthString() {
|
||||
String unevenHead = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "123";
|
||||
|
||||
try {
|
||||
messageIdHelper.toIdObject(unevenHead);
|
||||
fail("expected exception was not thrown");
|
||||
} catch (ActiveMQAMQPException ex) {
|
||||
} catch (ActiveMQAMQPIllegalStateException iae) {
|
||||
// expected
|
||||
String msg = iae.getMessage();
|
||||
assertTrue("Message was not as expected: " + msg, msg.contains("even length"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} throws an
|
||||
* {@link IdConversionException} when presented with an encoded binary hex string (after the
|
||||
* prefix) that contains characters other than 0-9 and A-F and a-f, and thus can't be
|
||||
* converted
|
||||
* {@link IdConversionException} when presented with an encoded binary hex
|
||||
* string (after the prefix) that contains characters other than 0-9 and A-F
|
||||
* and a-f, and thus can't be converted
|
||||
*/
|
||||
@Test
|
||||
public void testToIdObjectWithStringContainingBinaryHexThrowsWithNonHexCharacters() {
|
||||
public void testToIdObjectWithStringContainingBinaryHexThrowsICEWithNonHexCharacters() {
|
||||
|
||||
// char before '0'
|
||||
char nonHexChar = '/';
|
||||
String nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
|
||||
String nonHexString = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
|
||||
|
||||
try {
|
||||
messageIdHelper.toIdObject(nonHexString);
|
||||
fail("expected exception was not thrown");
|
||||
} catch (ActiveMQAMQPException ex) {
|
||||
} catch (ActiveMQAMQPIllegalStateException ice) {
|
||||
// expected
|
||||
String msg = ice.getMessage();
|
||||
assertTrue("Message was not as expected: " + msg, msg.contains("non-hex"));
|
||||
}
|
||||
|
||||
// char after '9', before 'A'
|
||||
nonHexChar = ':';
|
||||
nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
|
||||
nonHexString = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
|
||||
|
||||
try {
|
||||
messageIdHelper.toIdObject(nonHexString);
|
||||
fail("expected exception was not thrown");
|
||||
} catch (ActiveMQAMQPException ex) {
|
||||
} catch (ActiveMQAMQPIllegalStateException iae) {
|
||||
// expected
|
||||
String msg = iae.getMessage();
|
||||
assertTrue("Message was not as expected: " + msg, msg.contains("non-hex"));
|
||||
}
|
||||
|
||||
// char after 'F', before 'a'
|
||||
nonHexChar = 'G';
|
||||
nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
|
||||
nonHexString = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
|
||||
|
||||
try {
|
||||
messageIdHelper.toIdObject(nonHexString);
|
||||
fail("expected exception was not thrown");
|
||||
} catch (ActiveMQAMQPException ex) {
|
||||
} catch (ActiveMQAMQPIllegalStateException iae) {
|
||||
// expected
|
||||
String msg = iae.getMessage();
|
||||
assertTrue("Message was not as expected: " + msg, msg.contains("non-hex"));
|
||||
}
|
||||
|
||||
// char after 'f'
|
||||
nonHexChar = 'g';
|
||||
nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
|
||||
nonHexString = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
|
||||
|
||||
try {
|
||||
messageIdHelper.toIdObject(nonHexString);
|
||||
fail("expected exception was not thrown");
|
||||
} catch (ActiveMQAMQPException ex) {
|
||||
} catch (ActiveMQAMQPIllegalStateException ice) {
|
||||
// expected
|
||||
String msg = ice.getMessage();
|
||||
assertTrue("Message was not as expected: " + msg, msg.contains("non-hex"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -389,6 +389,31 @@ public class AmqpMessage {
|
|||
return message.getProperties().getGroupId();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the Subject property on an outbound message using the provided String
|
||||
*
|
||||
* @param subject the String Subject value to set.
|
||||
*/
|
||||
public void setSubject(String subject) {
|
||||
checkReadOnly();
|
||||
lazyCreateProperties();
|
||||
getWrappedMessage().setSubject(subject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the set Subject value in String form, if there are no properties
|
||||
* in the given message return null.
|
||||
*
|
||||
* @return the set Subject in String form or null if not set.
|
||||
*/
|
||||
public String getSubject() {
|
||||
if (message.getProperties() == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return message.getProperties().getSubject();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the durable header on the outgoing message.
|
||||
*
|
||||
|
|
|
@ -418,6 +418,37 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testReceiveWithJMSSelectorFilterOnJMSType() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpMessage message1 = new AmqpMessage();
|
||||
message1.setText("msg:1");
|
||||
|
||||
AmqpMessage message2 = new AmqpMessage();
|
||||
message2.setSubject("target");
|
||||
message2.setText("msg:2");
|
||||
|
||||
AmqpSender sender = session.createSender(getQueueName());
|
||||
sender.send(message1);
|
||||
sender.send(message2);
|
||||
sender.close();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName(), "JMSType = 'target'");
|
||||
receiver.flow(2);
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull("Should have read a message", received);
|
||||
assertEquals("target", received.getSubject());
|
||||
received.accept();
|
||||
|
||||
assertNull(receiver.receive(1, TimeUnit.SECONDS));
|
||||
|
||||
receiver.close();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testAdvancedLinkFlowControl() throws Exception {
|
||||
final int MSG_COUNT = 20;
|
||||
|
|
|
@ -16,6 +16,13 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Enumeration;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
|
@ -28,11 +35,6 @@ import javax.jms.MessageProducer;
|
|||
import javax.jms.QueueBrowser;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Enumeration;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
|
@ -167,13 +169,29 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
|
|||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSelector() throws Exception {
|
||||
public void testSelectorOnTopic() throws Exception {
|
||||
doTestSelector(true);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSelectorOnQueue() throws Exception {
|
||||
doTestSelector(false);
|
||||
}
|
||||
|
||||
private void doTestSelector(boolean topic) throws Exception {
|
||||
Connection connection = createConnection();
|
||||
|
||||
try {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Queue queue = session.createQueue(getQueueName());
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
Destination destination = null;
|
||||
if (topic) {
|
||||
destination = session.createTopic(getTopicName());
|
||||
} else {
|
||||
destination = session.createQueue(getQueueName());
|
||||
}
|
||||
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
MessageConsumer messageConsumer = session.createConsumer(destination, "color = 'RED'");
|
||||
|
||||
TextMessage message = session.createTextMessage();
|
||||
message.setText("msg:0");
|
||||
|
@ -185,7 +203,6 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
|
|||
|
||||
connection.start();
|
||||
|
||||
MessageConsumer messageConsumer = session.createConsumer(queue, "color = 'RED'");
|
||||
TextMessage m = (TextMessage) messageConsumer.receive(5000);
|
||||
assertNotNull(m);
|
||||
assertEquals("msg:1", m.getText());
|
||||
|
@ -195,38 +212,43 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test(timeout = 30000)
|
||||
public void testSelectorsWithJMSType() throws Exception {
|
||||
Connection connection = createConnection();
|
||||
public void testSelectorsWithJMSTypeOnTopic() throws Exception {
|
||||
doTestSelectorsWithJMSType(true);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testSelectorsWithJMSTypeOnQueue() throws Exception {
|
||||
doTestSelectorsWithJMSType(false);
|
||||
}
|
||||
|
||||
private void doTestSelectorsWithJMSType(boolean topic) throws Exception {
|
||||
final Connection connection = createConnection();
|
||||
final String type = "myJMSType";
|
||||
|
||||
try {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Queue queue = session.createQueue(getQueueName());
|
||||
MessageProducer p = session.createProducer(queue);
|
||||
|
||||
TextMessage message = session.createTextMessage();
|
||||
message.setText("text");
|
||||
p.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||
|
||||
TextMessage message2 = session.createTextMessage();
|
||||
String type = "myJMSType";
|
||||
message2.setJMSType(type);
|
||||
message2.setText("text + type");
|
||||
p.send(message2, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||
|
||||
QueueBrowser browser = session.createBrowser(queue);
|
||||
Enumeration enumeration = browser.getEnumeration();
|
||||
int count = 0;
|
||||
while (enumeration.hasMoreElements()) {
|
||||
Message m = (Message) enumeration.nextElement();
|
||||
assertTrue(m instanceof TextMessage);
|
||||
count++;
|
||||
Destination destination = null;
|
||||
if (topic) {
|
||||
destination = session.createTopic(getTopicName());
|
||||
} else {
|
||||
destination = session.createQueue(getQueueName());
|
||||
}
|
||||
|
||||
assertEquals(2, count);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
MessageConsumer consumer = session.createConsumer(destination, "JMSType = '" + type + "'");
|
||||
|
||||
TextMessage message1 = session.createTextMessage();
|
||||
message1.setText("text");
|
||||
producer.send(message1, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||
|
||||
TextMessage message2 = session.createTextMessage();
|
||||
message2.setJMSType(type);
|
||||
message2.setText("text + type");
|
||||
producer.send(message2, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||
|
||||
connection.start();
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue, "JMSType = '" + type + "'");
|
||||
Message msg = consumer.receive(2000);
|
||||
assertNotNull(msg);
|
||||
assertTrue(msg instanceof TextMessage);
|
||||
|
@ -237,7 +259,48 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test(timeout = 30000)
|
||||
public void testSelectorsWithJMSCorrelationID() throws Exception {
|
||||
Connection connection = createConnection();
|
||||
|
||||
final String correlationID = UUID.randomUUID().toString();
|
||||
|
||||
try {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Queue queue = session.createQueue(getQueueName());
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
TextMessage message1 = session.createTextMessage();
|
||||
message1.setText("text");
|
||||
producer.send(message1);
|
||||
|
||||
TextMessage message2 = session.createTextMessage();
|
||||
message2.setJMSCorrelationID(correlationID);
|
||||
message2.setText("JMSCorrelationID");
|
||||
producer.send(message2);
|
||||
|
||||
QueueBrowser browser = session.createBrowser(queue);
|
||||
Enumeration<?> enumeration = browser.getEnumeration();
|
||||
int count = 0;
|
||||
while (enumeration.hasMoreElements()) {
|
||||
Message m = (Message) enumeration.nextElement();
|
||||
assertTrue(m instanceof TextMessage);
|
||||
count++;
|
||||
}
|
||||
|
||||
assertEquals(2, count);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue, "JMSCorrelationID = '" + correlationID + "'");
|
||||
Message msg = consumer.receive(2000);
|
||||
assertNotNull(msg);
|
||||
assertTrue(msg instanceof TextMessage);
|
||||
assertEquals("Unexpected JMSCorrelationID value", correlationID, msg.getJMSCorrelationID());
|
||||
assertEquals("Unexpected message content", "JMSCorrelationID", ((TextMessage) msg).getText());
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testSelectorsWithJMSPriority() throws Exception {
|
||||
Connection connection = createConnection();
|
||||
|
@ -245,18 +308,18 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
|
|||
try {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Queue queue = session.createQueue(getQueueName());
|
||||
MessageProducer p = session.createProducer(queue);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
TextMessage message = session.createTextMessage();
|
||||
message.setText("hello");
|
||||
p.send(message, DeliveryMode.PERSISTENT, 5, 0);
|
||||
producer.send(message, DeliveryMode.PERSISTENT, 5, 0);
|
||||
|
||||
message = session.createTextMessage();
|
||||
message.setText("hello + 9");
|
||||
p.send(message, DeliveryMode.PERSISTENT, 9, 0);
|
||||
producer.send(message, DeliveryMode.PERSISTENT, 9, 0);
|
||||
|
||||
QueueBrowser browser = session.createBrowser(queue);
|
||||
Enumeration enumeration = browser.getEnumeration();
|
||||
Enumeration<?> enumeration = browser.getEnumeration();
|
||||
int count = 0;
|
||||
while (enumeration.hasMoreElements()) {
|
||||
Message m = (Message) enumeration.nextElement();
|
||||
|
@ -276,8 +339,163 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testSelectorsWithJMSXGroupIDOnTopic() throws Exception {
|
||||
doTestSelectorsWithJMSXGroupID(true);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testSelectorsWithJMSXGroupIDOnQueue() throws Exception {
|
||||
doTestSelectorsWithJMSXGroupID(false);
|
||||
}
|
||||
|
||||
private void doTestSelectorsWithJMSXGroupID(boolean topic) throws Exception {
|
||||
|
||||
Connection connection = createConnection();
|
||||
|
||||
try {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = null;
|
||||
if (topic) {
|
||||
destination = session.createTopic(getTopicName());
|
||||
} else {
|
||||
destination = session.createQueue(getQueueName());
|
||||
}
|
||||
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
MessageConsumer consumer = session.createConsumer(destination, "JMSXGroupID = '1'");
|
||||
|
||||
TextMessage message = session.createTextMessage();
|
||||
message.setText("group 1 - 1");
|
||||
message.setStringProperty("JMSXGroupID", "1");
|
||||
message.setIntProperty("JMSXGroupSeq", 1);
|
||||
producer.send(message);
|
||||
|
||||
message = session.createTextMessage();
|
||||
message.setText("group 2");
|
||||
message.setStringProperty("JMSXGroupID", "2");
|
||||
producer.send(message);
|
||||
|
||||
message = session.createTextMessage();
|
||||
message.setText("group 1 - 2");
|
||||
message.setStringProperty("JMSXGroupID", "1");
|
||||
message.setIntProperty("JMSXGroupSeq", -1);
|
||||
producer.send(message);
|
||||
|
||||
connection.start();
|
||||
|
||||
Message msg = consumer.receive(2000);
|
||||
assertNotNull(msg);
|
||||
assertTrue(msg instanceof TextMessage);
|
||||
assertEquals("group 1 - 1", ((TextMessage) msg).getText());
|
||||
msg = consumer.receive(2000);
|
||||
assertNotNull(msg);
|
||||
assertTrue(msg instanceof TextMessage);
|
||||
assertEquals("group 1 - 2", ((TextMessage) msg).getText());
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testSelectorsWithJMSDeliveryOnQueue() throws Exception {
|
||||
final Connection connection = createConnection();
|
||||
|
||||
String selector = "JMSDeliveryMode = 'PERSISTENT'";
|
||||
|
||||
try {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createQueue(getQueueName());
|
||||
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
MessageConsumer consumer = session.createConsumer(destination, selector);
|
||||
|
||||
TextMessage message1 = session.createTextMessage();
|
||||
message1.setText("non-persistent");
|
||||
producer.send(message1, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||
|
||||
TextMessage message2 = session.createTextMessage();
|
||||
message2.setText("persistent");
|
||||
producer.send(message2, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||
|
||||
connection.start();
|
||||
|
||||
Message msg = consumer.receive(2000);
|
||||
assertNotNull(msg);
|
||||
assertTrue(msg instanceof TextMessage);
|
||||
assertEquals("Unexpected JMSDeliveryMode value", DeliveryMode.PERSISTENT, msg.getJMSDeliveryMode());
|
||||
assertEquals("Unexpected message content", "persistent", ((TextMessage) msg).getText());
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testSelectorsWithJMSTimestampOnQueue() throws Exception {
|
||||
final Connection connection = createConnection();
|
||||
|
||||
try {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createQueue(getQueueName());
|
||||
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
TextMessage message1 = session.createTextMessage();
|
||||
message1.setText("filtered");
|
||||
producer.send(message1, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||
|
||||
TextMessage message2 = session.createTextMessage();
|
||||
message2.setText("expected");
|
||||
producer.send(message2, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(destination, "JMSTimestamp = " + message2.getJMSTimestamp());
|
||||
|
||||
connection.start();
|
||||
|
||||
Message msg = consumer.receive(2000);
|
||||
assertNotNull(msg);
|
||||
assertTrue(msg instanceof TextMessage);
|
||||
assertEquals("Unexpected JMSTimestamp value", message2.getJMSTimestamp(), msg.getJMSTimestamp());
|
||||
assertEquals("Unexpected message content", "expected", ((TextMessage) msg).getText());
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testSelectorsWithJMSExpirationOnQueue() throws Exception {
|
||||
final Connection connection = createConnection();
|
||||
|
||||
try {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createQueue(getQueueName());
|
||||
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
TextMessage message1 = session.createTextMessage();
|
||||
message1.setText("filtered");
|
||||
producer.send(message1, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||
|
||||
TextMessage message2 = session.createTextMessage();
|
||||
message2.setText("expected");
|
||||
producer.send(message2, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, 60000);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(destination, "JMSExpiration = " + message2.getJMSExpiration());
|
||||
|
||||
connection.start();
|
||||
|
||||
Message msg = consumer.receive(2000);
|
||||
assertNotNull(msg);
|
||||
assertTrue(msg instanceof TextMessage);
|
||||
assertEquals("Unexpected JMSExpiration value", message2.getJMSExpiration(), msg.getJMSExpiration());
|
||||
assertEquals("Unexpected message content", "expected", ((TextMessage) msg).getText());
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testJMSSelectorFiltersJMSMessageID() throws Exception {
|
||||
public void testJMSSelectorFiltersJMSMessageIDOnTopic() throws Exception {
|
||||
Connection connection = createConnection();
|
||||
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue