ARTEMIS-1314 Fixing issues with JMS selectors on AMQP

Allows for JMS selectors on JMSCorrelationID as well as JMSXGroupID
and JMSXUserID along with some fixes to avoid an NPE case and fixes
to the conversion of AMQP MessageID and CorrelationID values when
doing cross protocol mappings.  Adds new tests to cover more cases
of using the JMS selector with Qpid JMS and the AMQP test client.
This commit is contained in:
Timothy Bish 2017-10-04 08:49:07 -04:00 committed by Clebert Suconic
parent 8af0569521
commit 6d94997aa8
7 changed files with 1099 additions and 335 deletions

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
@ -557,7 +558,6 @@ public class AMQPMessage extends RefCountMessage {
}
}
@Override
public org.apache.activemq.artemis.api.core.Message setUserID(Object userID) {
return null;
@ -725,7 +725,6 @@ public class AMQPMessage extends RefCountMessage {
return this;
}
@Override
public byte[] getExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
if (extraProperties == null) {
@ -735,7 +734,6 @@ public class AMQPMessage extends RefCountMessage {
}
}
@Override
public byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
if (extraProperties == null) {
@ -745,8 +743,6 @@ public class AMQPMessage extends RefCountMessage {
}
}
@Override
public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) {
getApplicationPropertiesMap().put(key, Boolean.valueOf(value));
@ -904,9 +900,19 @@ public class AMQPMessage extends RefCountMessage {
@Override
public Object getObjectProperty(String key) {
if (key.equals(MessageUtil.TYPE_HEADER_NAME.toString())) {
if (getProperties() != null) {
return getProperties().getSubject();
}
} else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
return getConnectionID();
} else if (key.equals(MessageUtil.JMSXGROUPID)) {
return getGroupID();
} else if (key.equals(MessageUtil.JMSXUSERID)) {
return getAMQPUserID();
} else if (key.equals(MessageUtil.CORRELATIONID_HEADER_NAME.toString())) {
if (getProperties() != null && getProperties().getCorrelationId() != null) {
return AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(getProperties().getCorrelationId());
}
} else {
Object value = getApplicationPropertiesMap().get(key);
if (value instanceof UnsignedInteger ||
@ -918,6 +924,8 @@ public class AMQPMessage extends RefCountMessage {
return value;
}
}
return null;
}
@Override

View File

@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -16,7 +15,6 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.protocol.amqp.converter;
@ -28,26 +26,32 @@ import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedLong;
/**
* Helper class for identifying and converting message-id and correlation-id values between the
* AMQP types and the Strings values used by JMS.
* Helper class for identifying and converting message-id and correlation-id
* values between the AMQP types and the Strings values used by JMS.
*
* <p>
* AMQP messages allow for 4 types of message-id/correlation-id: message-id-string,
* message-id-binary, message-id-uuid, or message-id-ulong. In order to accept or return a
* string representation of these for interoperability with other AMQP clients, the following
* encoding can be used after removing or before adding the "ID:" prefix used for a JMSMessageID
* AMQP messages allow for 4 types of message-id/correlation-id:
* message-id-string, message-id-binary, message-id-uuid, or message-id-ulong.
* In order to accept or return a string representation of these for
* interoperability with other AMQP clients, the following encoding can be used
* after removing or before adding the "ID:" prefix used for a JMSMessageID
* value:<br>
* <p>
*
* {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
* {@literal "AMQP_UUID:<string representation of uuid>"}<br>
* {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
* {@literal "AMQP_STRING:<string>"}<br>
*
* <p>
* The AMQP_STRING encoding exists only for escaping message-id-string values that happen to
* begin with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used
* otherwise.
* The AMQP_STRING encoding exists only for escaping message-id-string values
* that happen to begin with one of the encoding prefixes (including AMQP_STRING
* itself). It MUST NOT be used otherwise.
*
* <p>
* When provided a string for conversion which attempts to identify itself as an encoded binary,
* uuid, or ulong but can't be converted into the indicated format, an exception will be thrown.
* When provided a string for conversion which attempts to identify itself as an
* encoded binary, uuid, or ulong but can't be converted into the indicated
* format, an exception will be thrown.
*
*/
public class AMQPMessageIdHelper {
@ -57,109 +61,213 @@ public class AMQPMessageIdHelper {
public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
public static final String AMQP_NO_PREFIX = "AMQP_NO_PREFIX:";
public static final String JMS_ID_PREFIX = "ID:";
private static final String AMQP_PREFIX = "AMQP_";
private static final int JMS_ID_PREFIX_LENGTH = JMS_ID_PREFIX.length();
private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length();
private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length();
private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length();
private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length();
private static final int AMQP_NO_PREFIX_LENGTH = AMQP_NO_PREFIX.length();
private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
/**
* Takes the provided AMQP messageId style object, and convert it to a base string. Encodes
* type information as a prefix where necessary to convey or escape the type of the provided
* object.
* Checks whether the given string begins with "ID:" prefix used to denote a
* JMSMessageID
*
* @param messageId
* the raw messageId object to process
* @return the base string to be used in creating the actual id.
* @param string
* the string to check
* @return true if and only id the string begins with "ID:"
*/
public String toBaseMessageIdString(Object messageId) {
if (messageId == null) {
return null;
} else if (messageId instanceof String) {
String stringId = (String) messageId;
public boolean hasMessageIdPrefix(String string) {
if (string == null) {
return false;
}
// If the given string has a type encoding prefix,
// we need to escape it as an encoded string (even if
// the existing encoding prefix was also for string)
if (hasTypeEncodingPrefix(stringId)) {
return AMQP_STRING_PREFIX + stringId;
return string.startsWith(JMS_ID_PREFIX);
}
public String toMessageIdString(Object idObject) {
if (idObject instanceof String) {
final String stringId = (String) idObject;
boolean hasMessageIdPrefix = hasMessageIdPrefix(stringId);
if (!hasMessageIdPrefix) {
// For JMSMessageID, has no "ID:" prefix, we need to record
// that for later use as a JMSCorrelationID.
return JMS_ID_PREFIX + AMQP_NO_PREFIX + stringId;
} else if (hasTypeEncodingPrefix(stringId, JMS_ID_PREFIX_LENGTH)) {
// We are for a JMSMessageID value, but have 'ID:' followed by
// one of the encoding prefixes. Need to escape the entire string
// to preserve for later re-use as a JMSCorrelationID.
return JMS_ID_PREFIX + AMQP_STRING_PREFIX + stringId;
} else {
// It has "ID:" prefix and doesn't have encoding prefix, use it as-is.
return stringId;
}
} else if (messageId instanceof UUID) {
return AMQP_UUID_PREFIX + messageId.toString();
} else if (messageId instanceof UnsignedLong) {
return AMQP_ULONG_PREFIX + messageId.toString();
} else if (messageId instanceof Binary) {
ByteBuffer dup = ((Binary) messageId).asByteBuffer();
} else {
// Not a string, convert it
return convertToIdString(idObject);
}
}
public String toCorrelationIdString(Object idObject) {
if (idObject instanceof String) {
final String stringId = (String) idObject;
boolean hasMessageIdPrefix = hasMessageIdPrefix(stringId);
if (!hasMessageIdPrefix) {
// For JMSCorrelationID, has no "ID:" prefix, use it as-is.
return stringId;
} else if (hasTypeEncodingPrefix(stringId, JMS_ID_PREFIX_LENGTH)) {
// We are for a JMSCorrelationID value, but have 'ID:' followed by
// one of the encoding prefixes. Need to escape the entire string
// to preserve for later re-use as a JMSCorrelationID.
return JMS_ID_PREFIX + AMQP_STRING_PREFIX + stringId;
} else {
// It has "ID:" prefix and doesn't have encoding prefix, use it as-is.
return stringId;
}
} else {
// Not a string, convert it
return convertToIdString(idObject);
}
}
/**
* Takes the provided non-String AMQP message-id/correlation-id object, and
* convert it it to a String usable as either a JMSMessageID or
* JMSCorrelationID value, encoding the type information as a prefix to
* convey for later use in reversing the process if used to set
* JMSCorrelationID on a message.
*
* @param idObject
* the object to process
* @return string to be used for the actual JMS ID.
*/
private String convertToIdString(Object idObject) {
if (idObject == null) {
return null;
}
if (idObject instanceof UUID) {
return JMS_ID_PREFIX + AMQP_UUID_PREFIX + idObject.toString();
} else if (idObject instanceof UnsignedLong) {
return JMS_ID_PREFIX + AMQP_ULONG_PREFIX + idObject.toString();
} else if (idObject instanceof Binary) {
ByteBuffer dup = ((Binary) idObject).asByteBuffer();
byte[] bytes = new byte[dup.remaining()];
dup.get(bytes);
String hex = convertBinaryToHexString(bytes);
return AMQP_BINARY_PREFIX + hex;
return JMS_ID_PREFIX + AMQP_BINARY_PREFIX + hex;
} else {
throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass());
throw new IllegalArgumentException("Unsupported type provided: " + idObject.getClass());
}
}
private boolean hasTypeEncodingPrefix(String stringId, int offset) {
if (!stringId.startsWith(AMQP_PREFIX, offset)) {
return false;
}
return hasAmqpBinaryPrefix(stringId, offset) || hasAmqpUuidPrefix(stringId, offset) || hasAmqpUlongPrefix(stringId, offset)
|| hasAmqpStringPrefix(stringId, offset) || hasAmqpNoPrefix(stringId, offset);
}
private boolean hasAmqpStringPrefix(String stringId, int offset) {
return stringId.startsWith(AMQP_STRING_PREFIX, offset);
}
private boolean hasAmqpUlongPrefix(String stringId, int offset) {
return stringId.startsWith(AMQP_ULONG_PREFIX, offset);
}
private boolean hasAmqpUuidPrefix(String stringId, int offset) {
return stringId.startsWith(AMQP_UUID_PREFIX, offset);
}
private boolean hasAmqpBinaryPrefix(String stringId, int offset) {
return stringId.startsWith(AMQP_BINARY_PREFIX, offset);
}
private boolean hasAmqpNoPrefix(String stringId, int offset) {
return stringId.startsWith(AMQP_NO_PREFIX, offset);
}
/**
* Takes the provided base id string and return the appropriate amqp messageId style object.
* Converts the type based on any relevant encoding information found as a prefix.
* Takes the provided id string and return the appropriate amqp messageId
* style object. Converts the type based on any relevant encoding information
* found as a prefix.
*
* @param baseId
* the object to be converted to an AMQP MessageId value.
* @param origId
* the object to be converted
* @return the AMQP messageId style object
* @throws ActiveMQAMQPIllegalStateException
* if the provided baseId String indicates an encoded type but can't be converted to
* that type.
*
* @throws IllegalArgument
* if the provided baseId String indicates an encoded type but can't
* be converted to that type.
*/
public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException {
if (baseId == null) {
public Object toIdObject(final String origId) throws ActiveMQAMQPIllegalStateException {
if (origId == null) {
return null;
}
if (!AMQPMessageIdHelper.INSTANCE.hasMessageIdPrefix(origId)) {
// We have a string without any "ID:" prefix, it is an
// application-specific String, use it as-is.
return origId;
}
try {
if (hasAmqpUuidPrefix(baseId)) {
String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH);
if (hasAmqpNoPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
// Prefix telling us there was originally no "ID:" prefix,
// strip it and return the remainder
return origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_NO_PREFIX_LENGTH);
} else if (hasAmqpUuidPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
String uuidString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_UUID_PREFIX_LENGTH);
return UUID.fromString(uuidString);
} else if (hasAmqpUlongPrefix(baseId)) {
String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH);
return UnsignedLong.valueOf(longString);
} else if (hasAmqpStringPrefix(baseId)) {
return strip(baseId, AMQP_STRING_PREFIX_LENGTH);
} else if (hasAmqpBinaryPrefix(baseId)) {
String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH);
} else if (hasAmqpUlongPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
String ulongString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_ULONG_PREFIX_LENGTH);
return UnsignedLong.valueOf(ulongString);
} else if (hasAmqpStringPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
return origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_STRING_PREFIX_LENGTH);
} else if (hasAmqpBinaryPrefix(origId, JMS_ID_PREFIX_LENGTH)) {
String hexString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_BINARY_PREFIX_LENGTH);
byte[] bytes = convertHexStringToBinary(hexString);
return new Binary(bytes);
} else {
// We have a string without any type prefix, transmit it as-is.
return baseId;
// We have a string without any encoding prefix needing processed,
// so transmit it as-is, including the "ID:"
return origId;
}
} catch (IllegalArgumentException e) {
throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value");
} catch (IllegalArgumentException iae) {
throw new ActiveMQAMQPIllegalStateException(iae.getMessage());
}
}
/**
* Convert the provided hex-string into a binary representation where each byte represents
* two characters of the hex string.
* <p>
* Convert the provided hex-string into a binary representation where each
* byte represents two characters of the hex string.
*
* The hex characters may be upper or lower case.
*
* @param hexString
* string to convert to a binary value.
* string to convert
* @return a byte array containing the binary representation
* @throws IllegalArgumentException
* if the provided String is a non-even length or contains non-hex characters
* if the provided String is a non-even length or contains non-hex
* characters
*/
public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException {
int length = hexString.length();
// As each byte needs two characters in the hex encoding, the string must be an even
// length.
// As each byte needs two characters in the hex encoding, the string must
// be an even length.
if (length % 2 != 0) {
throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString);
}
@ -179,14 +287,32 @@ public class AMQPMessageIdHelper {
return binary;
}
private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
if (ch >= '0' && ch <= '9') {
// subtract '0' to get difference in position as an int
return ch - '0';
} else if (ch >= 'A' && ch <= 'F') {
// subtract 'A' to get difference in position as an int
// and then add 10 for the offset of 'A'
return ch - 'A' + 10;
} else if (ch >= 'a' && ch <= 'f') {
// subtract 'a' to get difference in position as an int
// and then add 10 for the offset of 'a'
return ch - 'a' + 10;
}
throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
}
/**
* Convert the provided binary into a hex-string representation where each character
* represents 4 bits of the provided binary, i.e each byte requires two characters.
* <p>
* Convert the provided binary into a hex-string representation where each
* character represents 4 bits of the provided binary, i.e each byte requires
* two characters.
*
* The returned hex characters are upper-case.
*
* @param bytes
* the binary value to convert to a hex String instance.
* binary to convert
* @return a String containing a hex representation of the bytes
*/
public String convertBinaryToHexString(byte[] bytes) {
@ -206,47 +332,4 @@ public class AMQPMessageIdHelper {
return builder.toString();
}
// ----- Internal implementation ------------------------------------------//
private boolean hasTypeEncodingPrefix(String stringId) {
return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) || hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId);
}
private boolean hasAmqpStringPrefix(String stringId) {
return stringId.startsWith(AMQP_STRING_PREFIX);
}
private boolean hasAmqpUlongPrefix(String stringId) {
return stringId.startsWith(AMQP_ULONG_PREFIX);
}
private boolean hasAmqpUuidPrefix(String stringId) {
return stringId.startsWith(AMQP_UUID_PREFIX);
}
private boolean hasAmqpBinaryPrefix(String stringId) {
return stringId.startsWith(AMQP_BINARY_PREFIX);
}
private String strip(String id, int numChars) {
return id.substring(numChars);
}
private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
if (ch >= '0' && ch <= '9') {
// subtract '0' to get difference in position as an int
return ch - '0';
} else if (ch >= 'A' && ch <= 'F') {
// subtract 'A' to get difference in position as an int
// and then add 10 for the offset of 'A'
return ch - 'A' + 10;
} else if (ch >= 'a' && ch <= 'f') {
// subtract 'a' to get difference in position as an int
// and then add 10 for the offset of 'a'
return ch - 'a' + 10;
}
throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
}
}

View File

@ -17,8 +17,27 @@
package org.apache.activemq.artemis.protocol.amqp.converter;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createBytesMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMapMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createObjectMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createStreamMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createTextMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.isContentType;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
@ -28,8 +47,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@ -59,26 +79,8 @@ import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.WritableBuffer;
import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createBytesMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMapMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createObjectMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createStreamMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createTextMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.isContentType;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
/**
* This class was created just to separate concerns on AMQPConverter.
@ -86,6 +88,7 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
* */
public class AmqpCoreConverter {
@SuppressWarnings("unchecked")
public static ICoreMessage toCore(AMQPMessage message) throws Exception {
Section body = message.getProtonMessage().getBody();
@ -189,6 +192,7 @@ public class AmqpCoreConverter {
return result != null ? result.getInnerMessage() : null;
}
@SuppressWarnings("unchecked")
protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
Header header = amqp.getHeader();
if (header != null) {
@ -250,11 +254,10 @@ public class AmqpCoreConverter {
final Properties properties = amqp.getProperties();
if (properties != null) {
if (properties.getMessageId() != null) {
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
}
Binary userId = properties.getUserId();
if (userId != null) {
// TODO - Better Way to set this?
jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
}
if (properties.getTo() != null) {
@ -267,7 +270,7 @@ public class AmqpCoreConverter {
jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
}
if (properties.getCorrelationId() != null) {
jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId()));
}
if (properties.getContentType() != null) {
jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString());
@ -357,6 +360,4 @@ public class AmqpCoreConverter {
msg.setObjectProperty(key, value);
}
}
}

View File

@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -16,177 +15,542 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.UUID;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
public class AMQPMessageIdHelperTest {
private AMQPMessageIdHelper messageIdHelper;
@Before
public void setUp() throws Exception {
messageIdHelper = new AMQPMessageIdHelper();
messageIdHelper = AMQPMessageIdHelper.INSTANCE;
}
/**
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns null if given
* null
* Test that {@link AMQPMessageIdHelper#hasMessageIdPrefix(String)} returns
* true for strings that begin "ID:"
*/
@Test
public void testToBaseMessageIdStringWithNull() {
public void testHasIdPrefixWithPrefix() {
String myId = "ID:something";
assertTrue("'ID:' prefix should have been identified", messageIdHelper.hasMessageIdPrefix(myId));
}
/**
* Test that {@link AMQPMessageIdHelper#hasMessageIdPrefix(String)} returns
* false for string beings "ID" without colon.
*/
@Test
public void testHasIdPrefixWithIDButNoColonPrefix() {
String myIdNoColon = "IDsomething";
assertFalse("'ID' prefix should not have been identified without trailing colon", messageIdHelper.hasMessageIdPrefix(myIdNoColon));
}
/**
* Test that {@link AMQPMessageIdHelper#hasMessageIdPrefix(String)} returns
* false for null
*/
@Test
public void testHasIdPrefixWithNull() {
String nullString = null;
assertNull("null string should have been returned", messageIdHelper.toBaseMessageIdString(nullString));
assertFalse("null string should not result in identification as having the prefix", messageIdHelper.hasMessageIdPrefix(nullString));
}
/**
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} throws an IAE if given
* an unexpected object type.
* Test that {@link AMQPMessageIdHelper#hasMessageIdPrefix(String)} returns
* false for strings that doesnt have "ID:" anywhere
*/
@Test
public void testToBaseMessageIdStringThrowsIAEWithUnexpectedType() {
public void testHasIdPrefixWithoutPrefix() {
String myNonId = "something";
assertFalse("string without 'ID:' anywhere should not have been identified as having the prefix", messageIdHelper.hasMessageIdPrefix(myNonId));
}
/**
* Test that {@link AMQPMessageIdHelper#hasMessageIdPrefix(String)} returns
* false for strings has lowercase "id:" prefix
*/
@Test
public void testHasIdPrefixWithLowercaseID() {
String myLowerCaseNonId = "id:something";
assertFalse("lowercase 'id:' prefix should not result in identification as having 'ID:' prefix", messageIdHelper.hasMessageIdPrefix(myLowerCaseNonId));
}
/**
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns
* null if given null
*/
@Test
public void testToMessageIdStringWithNull() {
assertNull("null string should have been returned", messageIdHelper.toMessageIdString(null));
}
/**
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} throws an
* IAE if given an unexpected object type.
*/
@Test
public void testToMessageIdStringThrowsIAEWithUnexpectedType() {
try {
messageIdHelper.toBaseMessageIdString(new Object());
messageIdHelper.toMessageIdString(new Object());
fail("expected exception not thrown");
} catch (IllegalArgumentException iae) {
// expected
}
}
/**
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns the given
* basic string unchanged
*/
@Test
public void testToBaseMessageIdStringWithString() {
String stringMessageId = "myIdString";
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(stringMessageId);
assertNotNull("null string should not have been returned", baseMessageIdString);
assertEquals("expected base id string was not returned", stringMessageId, baseMessageIdString);
private void doToMessageIdTestImpl(Object idObject, String expected) {
String idString = messageIdHelper.toMessageIdString(idObject);
assertNotNull("null string should not have been returned", idString);
assertEquals("expected id string was not returned", expected, idString);
}
/**
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
* indicating an AMQP encoded string, when the given string happens to already begin with the
* {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns
* the given basic "ID:content" string unchanged.
*/
@Test
public void testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForUUID() {
public void testToMessageIdStringWithString() {
String stringId = "ID:myIdString";
doToMessageIdTestImpl(stringId, stringId);
}
/**
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns
* the given basic string with the 'no prefix' prefix and "ID:" prefix.
*/
@Test
public void testToMessageIdStringWithStringNoPrefix() {
String stringId = "myIdStringNoPrefix";
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + stringId;
doToMessageIdTestImpl(stringId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating lack of "ID:" prefix, when the given string happens to
* begin with the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
*/
@Test
public void testToMessageIdStringWithStringBeginningWithEncodingPrefixForUUID() {
String uuidStringMessageId = AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID();
String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + uuidStringMessageId;
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + uuidStringMessageId;
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(uuidStringMessageId);
assertNotNull("null string should not have been returned", baseMessageIdString);
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
doToMessageIdTestImpl(uuidStringMessageId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
* indicating an AMQP encoded string, when the given string happens to already begin with the
* {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating lack of "ID:" prefix, when the given string happens to
* begin with the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
*/
@Test
public void testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForLong() {
public void testToMessageIdStringWithStringBeginningWithEncodingPrefixForLong() {
String longStringMessageId = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + Long.valueOf(123456789L);
String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + longStringMessageId;
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + longStringMessageId;
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(longStringMessageId);
assertNotNull("null string should not have been returned", baseMessageIdString);
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
doToMessageIdTestImpl(longStringMessageId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
* indicating an AMQP encoded string, when the given string happens to already begin with the
* {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating lack of "ID:" prefix, when the given string happens to
* begin with the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
*/
@Test
public void testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForBinary() {
public void testToMessageIdStringWithStringBeginningWithEncodingPrefixForBinary() {
String binaryStringMessageId = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "0123456789ABCDEF";
String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + binaryStringMessageId;
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + binaryStringMessageId;
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(binaryStringMessageId);
assertNotNull("null string should not have been returned", baseMessageIdString);
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
doToMessageIdTestImpl(binaryStringMessageId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
* indicating an AMQP encoded string (effectively twice), when the given string happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating lack of "ID:" prefix, when the given string happens to
* begin with the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
*/
@Test
public void testToBaseMessageIdStringWithStringBeginningWithEncodingPrefixForString() {
public void testToMessageIdStringWithStringBeginningWithEncodingPrefixForString() {
String stringMessageId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + "myStringId";
String expected = AMQPMessageIdHelper.AMQP_STRING_PREFIX + stringMessageId;
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + stringMessageId;
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(stringMessageId);
assertNotNull("null string should not have been returned", baseMessageIdString);
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
doToMessageIdTestImpl(stringMessageId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
* indicating an AMQP encoded UUID when given a UUID object.
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating lack of "ID:" prefix, effectively twice, when the given
* string happens to begin with the
* {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
*/
@Test
public void testToBaseMessageIdStringWithUUID() {
public void testToMessageIdStringWithStringBeginningWithEncodingPrefixForNoIdPrefix() {
String stringMessageId = AMQPMessageIdHelper.AMQP_NO_PREFIX + "myStringId";
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + stringMessageId;
doToMessageIdTestImpl(stringMessageId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating an AMQP encoded UUID when given a UUID object.
*/
@Test
public void testToMessageIdStringWithUUID() {
UUID uuidMessageId = UUID.randomUUID();
String expected = AMQPMessageIdHelper.AMQP_UUID_PREFIX + uuidMessageId.toString();
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_UUID_PREFIX + uuidMessageId.toString();
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(uuidMessageId);
assertNotNull("null string should not have been returned", baseMessageIdString);
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
doToMessageIdTestImpl(uuidMessageId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
* indicating an AMQP encoded ulong when given a UnsignedLong object.
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating an AMQP encoded ulong when given a UnsignedLong object.
*/
@Test
public void testToBaseMessageIdStringWithUnsignedLong() {
public void testToMessageIdStringWithUnsignedLong() {
UnsignedLong uLongMessageId = UnsignedLong.valueOf(123456789L);
String expected = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + uLongMessageId.toString();
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_ULONG_PREFIX + uLongMessageId.toString();
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(uLongMessageId);
assertNotNull("null string should not have been returned", baseMessageIdString);
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
doToMessageIdTestImpl(uLongMessageId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toBaseMessageIdString(Object)} returns a string
* indicating an AMQP encoded binary when given a Binary object.
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating an AMQP encoded binary when given a Binary object.
*/
@Test
public void testToBaseMessageIdStringWithBinary() {
public void testToMessageIdStringWithBinary() {
byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF};
Binary binary = new Binary(bytes);
String expected = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
String baseMessageIdString = messageIdHelper.toBaseMessageIdString(binary);
assertNotNull("null string should not have been returned", baseMessageIdString);
assertEquals("expected base id string was not returned", expected, baseMessageIdString);
doToMessageIdTestImpl(binary, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns an UnsignedLong when
* given a string indicating an encoded AMQP ulong id.
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating an escaped string, when given an input string that
* already has the "ID:" prefix, but follows it with an encoding prefix, in
* this case the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
*/
@Test
public void testToMessageIdStringWithStringBeginningWithIdAndEncodingPrefixForString() {
String unescapedStringPrefixMessageId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + "id-content";
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedStringPrefixMessageId;
doToMessageIdTestImpl(unescapedStringPrefixMessageId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating an escaped string, when given an input string that
* already has the "ID:" prefix, but follows it with an encoding prefix, in
* this case the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
*/
@Test
public void testToMessageIdStringWithStringBeginningWithIdAndEncodingPrefixForUUID() {
String unescapedUuidPrefixMessageId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID();
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedUuidPrefixMessageId;
doToMessageIdTestImpl(unescapedUuidPrefixMessageId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating an escaped string, when given an input string that
* already has the "ID:" prefix, but follows it with an encoding prefix, in
* this case the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
*/
@Test
public void testToMessageIdStringWithStringBeginningWithIdAndEncodingPrefixForUlong() {
String unescapedUlongPrefixMessageId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_ULONG_PREFIX + "42";
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedUlongPrefixMessageId;
doToMessageIdTestImpl(unescapedUlongPrefixMessageId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating an escaped string, when given an input string that
* already has the "ID:" prefix, but follows it with an encoding prefix, in
* this case the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
*/
@Test
public void testToMessageIdStringWithStringBeginningWithIdAndEncodingPrefixForBinary() {
String unescapedBinaryPrefixMessageId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "ABCDEF";
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedBinaryPrefixMessageId;
doToMessageIdTestImpl(unescapedBinaryPrefixMessageId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toMessageIdString(Object)} returns a
* string indicating an escaped string, when given an input string that
* already has the "ID:" prefix, but follows it with an encoding prefix, in
* this case the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
*/
@Test
public void testToMessageIdStringWithStringBeginningWithIdAndEncodingPrefixForNoIDPrefix() {
String unescapedNoPrefixPrefixedMessageId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + "id-content";
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedNoPrefixPrefixedMessageId;
doToMessageIdTestImpl(unescapedNoPrefixPrefixedMessageId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns null if given null
*/
@Test
public void testToCorrelationIdStringWithNull() {
assertNull("null string should have been returned", messageIdHelper.toCorrelationIdString(null));
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} throws
* an IAE if given an unexpected object type.
*/
@Test
public void testToCorrelationIdStringThrowsIAEWithUnexpectedType() {
try {
messageIdHelper.toCorrelationIdString(new Object());
fail("expected exception not thrown");
} catch (IllegalArgumentException iae) {
// expected
}
}
private void doToCorrelationIDTestImpl(Object idObject, String expected) {
String idString = messageIdHelper.toCorrelationIdString(idObject);
assertNotNull("null string should not have been returned", idString);
assertEquals("expected id string was not returned", expected, idString);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns the given basic string unchanged when it has the "ID:" prefix (but
* no others).
*/
@Test
public void testToCorrelationIdStringWithString() {
String stringId = "ID:myCorrelationIdString";
doToCorrelationIDTestImpl(stringId, stringId);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns the given basic string unchanged when it lacks the "ID:" prefix
* (and any others)
*/
@Test
public void testToCorrelationIdStringWithStringNoPrefix() {
String stringNoId = "myCorrelationIdString";
doToCorrelationIDTestImpl(stringNoId, stringNoId);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
*/
@Test
public void testToCorrelationIdStringWithStringBeginningWithEncodingPrefixForUUID() {
String uuidPrefixStringCorrelationId = AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID();
doToCorrelationIDTestImpl(uuidPrefixStringCorrelationId, uuidPrefixStringCorrelationId);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
*/
@Test
public void testToCorrelationIdStringWithStringBeginningWithEncodingPrefixForLong() {
String ulongPrefixStringCorrelationId = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + Long.valueOf(123456789L);
doToCorrelationIDTestImpl(ulongPrefixStringCorrelationId, ulongPrefixStringCorrelationId);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
*/
@Test
public void testToCorrelationIdStringWithStringBeginningWithEncodingPrefixForBinary() {
String binaryPrefixStringCorrelationId = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "0123456789ABCDEF";
doToCorrelationIDTestImpl(binaryPrefixStringCorrelationId, binaryPrefixStringCorrelationId);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
*/
@Test
public void testToCorrelationIdStringWithStringBeginningWithEncodingPrefixForString() {
String stringPrefixCorrelationId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + "myStringId";
doToCorrelationIDTestImpl(stringPrefixCorrelationId, stringPrefixCorrelationId);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
*/
@Test
public void testToCorrelationIdStringWithStringBeginningWithEncodingPrefixForNoIdPrefix() {
String noPrefixStringCorrelationId = AMQPMessageIdHelper.AMQP_NO_PREFIX + "myStringId";
doToCorrelationIDTestImpl(noPrefixStringCorrelationId, noPrefixStringCorrelationId);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string indicating an AMQP encoded UUID when given a UUID object.
*/
@Test
public void testToCorrelationIdStringWithUUID() {
UUID uuidCorrelationId = UUID.randomUUID();
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_UUID_PREFIX + uuidCorrelationId.toString();
doToCorrelationIDTestImpl(uuidCorrelationId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string indicating an AMQP encoded ulong when given a
* UnsignedLong object.
*/
@Test
public void testToCorrelationIdStringWithUnsignedLong() {
UnsignedLong uLongCorrelationId = UnsignedLong.valueOf(123456789L);
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_ULONG_PREFIX + uLongCorrelationId.toString();
doToCorrelationIDTestImpl(uLongCorrelationId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string indicating an AMQP encoded binary when given a Binary
* object.
*/
@Test
public void testToCorrelationIdStringWithBinary() {
byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF};
Binary binary = new Binary(bytes);
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
doToCorrelationIDTestImpl(binary, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
*/
@Test
public void testToCorrelationIdStringWithStringBeginningWithIdAndEncodingPrefixForString() {
String unescapedStringPrefixCorrelationId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + "id-content";
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedStringPrefixCorrelationId;
doToCorrelationIDTestImpl(unescapedStringPrefixCorrelationId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
*/
@Test
public void testToCorrelationIdStringWithStringBeginningWithIdAndEncodingPrefixForUUID() {
String unescapedUuidPrefixCorrelationId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID();
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedUuidPrefixCorrelationId;
doToCorrelationIDTestImpl(unescapedUuidPrefixCorrelationId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
*/
@Test
public void testToCorrelationIdStringWithStringBeginningWithIdAndEncodingPrefixForUlong() {
String unescapedUlongPrefixCorrelationId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_ULONG_PREFIX + "42";
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedUlongPrefixCorrelationId;
doToCorrelationIDTestImpl(unescapedUlongPrefixCorrelationId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
*/
@Test
public void testToCorrelationIdStringWithStringBeginningWithIdAndEncodingPrefixForBinary() {
String unescapedBinaryPrefixCorrelationId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "ABCDEF";
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedBinaryPrefixCorrelationId;
doToCorrelationIDTestImpl(unescapedBinaryPrefixCorrelationId, expected);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
*/
@Test
public void testToCorrelationIdStringWithStringBeginningWithIdAndEncodingPrefixForNoIDPrefix() {
String unescapedNoPrefixCorrelationId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + "id-content";
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + unescapedNoPrefixCorrelationId;
doToCorrelationIDTestImpl(unescapedNoPrefixCorrelationId, expected);
}
private void doToIdObjectTestImpl(String idString, Object expected) throws ActiveMQAMQPIllegalStateException {
Object idObject = messageIdHelper.toIdObject(idString);
assertNotNull("null object should not have been returned", idObject);
assertEquals("expected id object was not returned", expected, idObject);
}
/**
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns an
* UnsignedLong when given a string indicating an encoded AMQP ulong id.
*
* @throws Exception
* if an error occurs during the test.
@ -194,16 +558,15 @@ public class AMQPMessageIdHelperTest {
@Test
public void testToIdObjectWithEncodedUlong() throws Exception {
UnsignedLong longId = UnsignedLong.valueOf(123456789L);
String provided = AMQPMessageIdHelper.AMQP_ULONG_PREFIX + "123456789";
String provided = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_ULONG_PREFIX + "123456789";
Object idObject = messageIdHelper.toIdObject(provided);
assertNotNull("null object should not have been returned", idObject);
assertEquals("expected id object was not returned", longId, idObject);
doToIdObjectTestImpl(provided, longId);
}
/**
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a Binary when given a
* string indicating an encoded AMQP binary id, using upper case hex characters
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a Binary
* when given a string indicating an encoded AMQP binary id, using upper case
* hex characters
*
* @throws Exception
* if an error occurs during the test.
@ -213,15 +576,14 @@ public class AMQPMessageIdHelperTest {
byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF};
Binary binaryId = new Binary(bytes);
String provided = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
String provided = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
Object idObject = messageIdHelper.toIdObject(provided);
assertNotNull("null object should not have been returned", idObject);
assertEquals("expected id object was not returned", binaryId, idObject);
doToIdObjectTestImpl(provided, binaryId);
}
/**
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns null when given null.
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns null when
* given null.
*
* @throws Exception
* if an error occurs during the test.
@ -232,8 +594,9 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a Binary when given a
* string indicating an encoded AMQP binary id, using lower case hex characters.
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a Binary
* when given a string indicating an encoded AMQP binary id, using lower case
* hex characters.
*
* @throws Exception
* if an error occurs during the test.
@ -243,16 +606,14 @@ public class AMQPMessageIdHelperTest {
byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF};
Binary binaryId = new Binary(bytes);
String provided = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00ab09ff";
String provided = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00ab09ff";
Object idObject = messageIdHelper.toIdObject(provided);
assertNotNull("null object should not have been returned", idObject);
assertEquals("expected id object was not returned", binaryId, idObject);
doToIdObjectTestImpl(provided, binaryId);
}
/**
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a UUID when given a
* string indicating an encoded AMQP uuid id.
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a UUID
* when given a string indicating an encoded AMQP uuid id.
*
* @throws Exception
* if an error occurs during the test.
@ -260,133 +621,170 @@ public class AMQPMessageIdHelperTest {
@Test
public void testToIdObjectWithEncodedUuid() throws Exception {
UUID uuid = UUID.randomUUID();
String provided = AMQPMessageIdHelper.AMQP_UUID_PREFIX + uuid.toString();
String provided = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_UUID_PREFIX + uuid.toString();
Object idObject = messageIdHelper.toIdObject(provided);
assertNotNull("null object should not have been returned", idObject);
assertEquals("expected id object was not returned", uuid, idObject);
doToIdObjectTestImpl(provided, uuid);
}
/**
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a string when given a
* string without any type encoding prefix.
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a string
* unchanged when given a string without any prefix.
*
* @throws Exception
* if an error occurs during the test.
*/
@Test
public void testToIdObjectWithStringContainingNoEncodingPrefix() throws Exception {
public void testToIdObjectWithAppSpecificString() throws Exception {
String stringId = "myStringId";
Object idObject = messageIdHelper.toIdObject(stringId);
assertNotNull("null object should not have been returned", idObject);
assertEquals("expected id object was not returned", stringId, idObject);
doToIdObjectTestImpl(stringId, stringId);
}
/**
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns the remainder of the
* provided string after removing the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix.
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns a string
* unchanged when given a string with only the 'ID:' prefix.
*
* @throws Exception
* if an error occurs during the test.
*/
@Test
public void testToIdObjectWithStringContainingStringEncodingPrefix() throws Exception {
public void testToIdObjectWithSimplIdString() throws Exception {
String stringId = "ID:myStringId";
doToIdObjectTestImpl(stringId, stringId);
}
/**
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns the
* remainder of the provided string after removing the 'ID:' and
* {@link AMQPMessageIdHelper#AMQP_NO_PREFIX} prefix used to indicate it
* originally had no 'ID:' prefix [when arriving as a message id].
*
* @throws Exception
* if an error occurs during the test.
*/
@Test
public void testToIdObjectWithStringContainingEncodingPrefixForNoIdPrefix() throws Exception {
String suffix = "myStringSuffix";
String stringId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + suffix;
String stringId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_NO_PREFIX + suffix;
Object idObject = messageIdHelper.toIdObject(stringId);
assertNotNull("null object should not have been returned", idObject);
assertEquals("expected id object was not returned", suffix, idObject);
doToIdObjectTestImpl(stringId, suffix);
}
/**
* Test that when given a string with with the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}
* prefix and then additionally the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}, the
* {@link AMQPMessageIdHelper#toIdObject(String)} method returns the remainder of the
* provided string after removing the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix.
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} returns the
* remainder of the provided string after removing the
* {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix.
*
* @throws Exception
* if an error occurs during the test.
*/
@Test
public void testToIdObjectWithStringContainingStringEncodingPrefixAndThenUuidPrefix() throws Exception {
String encodedUuidString = AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID().toString();
String stringId = AMQPMessageIdHelper.AMQP_STRING_PREFIX + encodedUuidString;
public void testToIdObjectWithStringContainingIdStringEncodingPrefix() throws Exception {
String suffix = "myStringSuffix";
String stringId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + suffix;
Object idObject = messageIdHelper.toIdObject(stringId);
assertNotNull("null object should not have been returned", idObject);
assertEquals("expected id object was not returned", encodedUuidString, idObject);
doToIdObjectTestImpl(stringId, suffix);
}
/**
* Test that when given a string with with the
* {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix and then
* additionally the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}, the
* {@link AMQPMessageIdHelper#toIdObject(String)} method returns the
* remainder of the provided string after removing the
* {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX} prefix.
*
* @throws Exception
* if an error occurs during the test.
*/
@Test
public void testToIdObjectWithStringContainingIdStringEncodingPrefixAndThenUuidPrefix() throws Exception {
String encodedUuidString = AMQPMessageIdHelper.AMQP_UUID_PREFIX + UUID.randomUUID().toString();
String stringId = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_STRING_PREFIX + encodedUuidString;
doToIdObjectTestImpl(stringId, encodedUuidString);
}
/**
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} throws an
* {@link IdConversionException} when presented with an encoded binary hex string of uneven
* length (after the prefix) that thus can't be converted due to each byte using 2 characters
* {@link IdConversionException} when presented with an encoded binary hex
* string of uneven length (after the prefix) that thus can't be converted
* due to each byte using 2 characters
*/
@Test
public void testToIdObjectWithStringContainingBinaryHexThrowsWithUnevenLengthString() {
String unevenHead = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "123";
public void testToIdObjectWithStringContainingBinaryHexThrowsICEWithUnevenLengthString() {
String unevenHead = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "123";
try {
messageIdHelper.toIdObject(unevenHead);
fail("expected exception was not thrown");
} catch (ActiveMQAMQPException ex) {
} catch (ActiveMQAMQPIllegalStateException iae) {
// expected
String msg = iae.getMessage();
assertTrue("Message was not as expected: " + msg, msg.contains("even length"));
}
}
/**
* Test that {@link AMQPMessageIdHelper#toIdObject(String)} throws an
* {@link IdConversionException} when presented with an encoded binary hex string (after the
* prefix) that contains characters other than 0-9 and A-F and a-f, and thus can't be
* converted
* {@link IdConversionException} when presented with an encoded binary hex
* string (after the prefix) that contains characters other than 0-9 and A-F
* and a-f, and thus can't be converted
*/
@Test
public void testToIdObjectWithStringContainingBinaryHexThrowsWithNonHexCharacters() {
public void testToIdObjectWithStringContainingBinaryHexThrowsICEWithNonHexCharacters() {
// char before '0'
char nonHexChar = '/';
String nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
String nonHexString = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
try {
messageIdHelper.toIdObject(nonHexString);
fail("expected exception was not thrown");
} catch (ActiveMQAMQPException ex) {
} catch (ActiveMQAMQPIllegalStateException ice) {
// expected
String msg = ice.getMessage();
assertTrue("Message was not as expected: " + msg, msg.contains("non-hex"));
}
// char after '9', before 'A'
nonHexChar = ':';
nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
nonHexString = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
try {
messageIdHelper.toIdObject(nonHexString);
fail("expected exception was not thrown");
} catch (ActiveMQAMQPException ex) {
} catch (ActiveMQAMQPIllegalStateException iae) {
// expected
String msg = iae.getMessage();
assertTrue("Message was not as expected: " + msg, msg.contains("non-hex"));
}
// char after 'F', before 'a'
nonHexChar = 'G';
nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
nonHexString = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
try {
messageIdHelper.toIdObject(nonHexString);
fail("expected exception was not thrown");
} catch (ActiveMQAMQPException ex) {
} catch (ActiveMQAMQPIllegalStateException iae) {
// expected
String msg = iae.getMessage();
assertTrue("Message was not as expected: " + msg, msg.contains("non-hex"));
}
// char after 'f'
nonHexChar = 'g';
nonHexString = AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
nonHexString = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + nonHexChar + nonHexChar;
try {
messageIdHelper.toIdObject(nonHexString);
fail("expected exception was not thrown");
} catch (ActiveMQAMQPException ex) {
} catch (ActiveMQAMQPIllegalStateException ice) {
// expected
String msg = ice.getMessage();
assertTrue("Message was not as expected: " + msg, msg.contains("non-hex"));
}
}
}

View File

@ -389,6 +389,31 @@ public class AmqpMessage {
return message.getProperties().getGroupId();
}
/**
* Sets the Subject property on an outbound message using the provided String
*
* @param subject the String Subject value to set.
*/
public void setSubject(String subject) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setSubject(subject);
}
/**
* Return the set Subject value in String form, if there are no properties
* in the given message return null.
*
* @return the set Subject in String form or null if not set.
*/
public String getSubject() {
if (message.getProperties() == null) {
return null;
}
return message.getProperties().getSubject();
}
/**
* Sets the durable header on the outgoing message.
*

View File

@ -418,6 +418,37 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testReceiveWithJMSSelectorFilterOnJMSType() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpMessage message1 = new AmqpMessage();
message1.setText("msg:1");
AmqpMessage message2 = new AmqpMessage();
message2.setSubject("target");
message2.setText("msg:2");
AmqpSender sender = session.createSender(getQueueName());
sender.send(message1);
sender.send(message2);
sender.close();
AmqpReceiver receiver = session.createReceiver(getQueueName(), "JMSType = 'target'");
receiver.flow(2);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have read a message", received);
assertEquals("target", received.getSubject());
received.accept();
assertNull(receiver.receive(1, TimeUnit.SECONDS));
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testAdvancedLinkFlowControl() throws Exception {
final int MSG_COUNT = 20;

View File

@ -16,6 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@ -28,11 +35,6 @@ import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.util.Wait;
@ -167,13 +169,29 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
}
@Test(timeout = 60000)
public void testSelector() throws Exception {
public void testSelectorOnTopic() throws Exception {
doTestSelector(true);
}
@Test(timeout = 60000)
public void testSelectorOnQueue() throws Exception {
doTestSelector(false);
}
private void doTestSelector(boolean topic) throws Exception {
Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
Destination destination = null;
if (topic) {
destination = session.createTopic(getTopicName());
} else {
destination = session.createQueue(getQueueName());
}
MessageProducer producer = session.createProducer(destination);
MessageConsumer messageConsumer = session.createConsumer(destination, "color = 'RED'");
TextMessage message = session.createTextMessage();
message.setText("msg:0");
@ -185,7 +203,6 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
connection.start();
MessageConsumer messageConsumer = session.createConsumer(queue, "color = 'RED'");
TextMessage m = (TextMessage) messageConsumer.receive(5000);
assertNotNull(m);
assertEquals("msg:1", m.getText());
@ -195,38 +212,43 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
}
}
@SuppressWarnings("rawtypes")
@Test(timeout = 30000)
public void testSelectorsWithJMSType() throws Exception {
Connection connection = createConnection();
public void testSelectorsWithJMSTypeOnTopic() throws Exception {
doTestSelectorsWithJMSType(true);
}
@Test(timeout = 30000)
public void testSelectorsWithJMSTypeOnQueue() throws Exception {
doTestSelectorsWithJMSType(false);
}
private void doTestSelectorsWithJMSType(boolean topic) throws Exception {
final Connection connection = createConnection();
final String type = "myJMSType";
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
message.setText("text");
p.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
TextMessage message2 = session.createTextMessage();
String type = "myJMSType";
message2.setJMSType(type);
message2.setText("text + type");
p.send(message2, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
count++;
Destination destination = null;
if (topic) {
destination = session.createTopic(getTopicName());
} else {
destination = session.createQueue(getQueueName());
}
assertEquals(2, count);
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination, "JMSType = '" + type + "'");
TextMessage message1 = session.createTextMessage();
message1.setText("text");
producer.send(message1, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
TextMessage message2 = session.createTextMessage();
message2.setJMSType(type);
message2.setText("text + type");
producer.send(message2, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
connection.start();
MessageConsumer consumer = session.createConsumer(queue, "JMSType = '" + type + "'");
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
@ -237,7 +259,48 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
}
}
@SuppressWarnings("rawtypes")
@Test(timeout = 30000)
public void testSelectorsWithJMSCorrelationID() throws Exception {
Connection connection = createConnection();
final String correlationID = UUID.randomUUID().toString();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
TextMessage message1 = session.createTextMessage();
message1.setText("text");
producer.send(message1);
TextMessage message2 = session.createTextMessage();
message2.setJMSCorrelationID(correlationID);
message2.setText("JMSCorrelationID");
producer.send(message2);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
count++;
}
assertEquals(2, count);
MessageConsumer consumer = session.createConsumer(queue, "JMSCorrelationID = '" + correlationID + "'");
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("Unexpected JMSCorrelationID value", correlationID, msg.getJMSCorrelationID());
assertEquals("Unexpected message content", "JMSCorrelationID", ((TextMessage) msg).getText());
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testSelectorsWithJMSPriority() throws Exception {
Connection connection = createConnection();
@ -245,18 +308,18 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
MessageProducer p = session.createProducer(queue);
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage();
message.setText("hello");
p.send(message, DeliveryMode.PERSISTENT, 5, 0);
producer.send(message, DeliveryMode.PERSISTENT, 5, 0);
message = session.createTextMessage();
message.setText("hello + 9");
p.send(message, DeliveryMode.PERSISTENT, 9, 0);
producer.send(message, DeliveryMode.PERSISTENT, 9, 0);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
@ -276,8 +339,163 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport {
}
}
@Test(timeout = 30000)
public void testSelectorsWithJMSXGroupIDOnTopic() throws Exception {
doTestSelectorsWithJMSXGroupID(true);
}
@Test(timeout = 30000)
public void testSelectorsWithJMSXGroupIDOnQueue() throws Exception {
doTestSelectorsWithJMSXGroupID(false);
}
private void doTestSelectorsWithJMSXGroupID(boolean topic) throws Exception {
Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = null;
if (topic) {
destination = session.createTopic(getTopicName());
} else {
destination = session.createQueue(getQueueName());
}
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination, "JMSXGroupID = '1'");
TextMessage message = session.createTextMessage();
message.setText("group 1 - 1");
message.setStringProperty("JMSXGroupID", "1");
message.setIntProperty("JMSXGroupSeq", 1);
producer.send(message);
message = session.createTextMessage();
message.setText("group 2");
message.setStringProperty("JMSXGroupID", "2");
producer.send(message);
message = session.createTextMessage();
message.setText("group 1 - 2");
message.setStringProperty("JMSXGroupID", "1");
message.setIntProperty("JMSXGroupSeq", -1);
producer.send(message);
connection.start();
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("group 1 - 1", ((TextMessage) msg).getText());
msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("group 1 - 2", ((TextMessage) msg).getText());
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testSelectorsWithJMSDeliveryOnQueue() throws Exception {
final Connection connection = createConnection();
String selector = "JMSDeliveryMode = 'PERSISTENT'";
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination, selector);
TextMessage message1 = session.createTextMessage();
message1.setText("non-persistent");
producer.send(message1, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
TextMessage message2 = session.createTextMessage();
message2.setText("persistent");
producer.send(message2, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
connection.start();
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("Unexpected JMSDeliveryMode value", DeliveryMode.PERSISTENT, msg.getJMSDeliveryMode());
assertEquals("Unexpected message content", "persistent", ((TextMessage) msg).getText());
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testSelectorsWithJMSTimestampOnQueue() throws Exception {
final Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(destination);
TextMessage message1 = session.createTextMessage();
message1.setText("filtered");
producer.send(message1, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
TextMessage message2 = session.createTextMessage();
message2.setText("expected");
producer.send(message2, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
MessageConsumer consumer = session.createConsumer(destination, "JMSTimestamp = " + message2.getJMSTimestamp());
connection.start();
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("Unexpected JMSTimestamp value", message2.getJMSTimestamp(), msg.getJMSTimestamp());
assertEquals("Unexpected message content", "expected", ((TextMessage) msg).getText());
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testSelectorsWithJMSExpirationOnQueue() throws Exception {
final Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(destination);
TextMessage message1 = session.createTextMessage();
message1.setText("filtered");
producer.send(message1, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
TextMessage message2 = session.createTextMessage();
message2.setText("expected");
producer.send(message2, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, 60000);
MessageConsumer consumer = session.createConsumer(destination, "JMSExpiration = " + message2.getJMSExpiration());
connection.start();
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("Unexpected JMSExpiration value", message2.getJMSExpiration(), msg.getJMSExpiration());
assertEquals("Unexpected message content", "expected", ((TextMessage) msg).getText());
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testJMSSelectorFiltersJMSMessageID() throws Exception {
public void testJMSSelectorFiltersJMSMessageIDOnTopic() throws Exception {
Connection connection = createConnection();
try {