ARTEMIS-3461: add some tests and resolve various issues spotted with the prior changes

- Avoid blowing up on string bodies of any size if the valueSizeLimit bits are configured to disable limit
- Dont NPE if amqp-value + binary body is sent without a content-type, as it always should be.
- Include expected prefix when adding delivery delay and ingress time annotations.
- Use the actual name for ingress time annotation, as with all other annotations.
- Use correct object type when testing equality with content-type value.
- Use consistent case for 'groupId' in different properties.
This commit is contained in:
Robbie Gemmell 2021-11-01 20:00:15 +00:00 committed by clebertsuconic
parent ea8fe11c6d
commit 3f9de5fa30
5 changed files with 468 additions and 19 deletions

View File

@ -326,7 +326,7 @@ public final class JsonUtil {
} }
public static String truncateString(final String str, final int valueSizeLimit) { public static String truncateString(final String str, final int valueSizeLimit) {
if (str.length() > valueSizeLimit) { if (valueSizeLimit >= 0 && str.length() > valueSizeLimit) {
return new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString(); return new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString();
} else { } else {
return str; return str;

View File

@ -69,7 +69,8 @@ public class JsonUtilTest {
Assert.assertEquals(6, jsonObject.getJsonArray("byteArray").size()); Assert.assertEquals(6, jsonObject.getJsonArray("byteArray").size());
} }
@Test public void testAddByteArrayToJsonArray() { @Test
public void testAddByteArrayToJsonArray() {
JsonArrayBuilder jsonArrayBuilder = JsonLoader.createArrayBuilder(); JsonArrayBuilder jsonArrayBuilder = JsonLoader.createArrayBuilder();
byte[] bytes = {0x0a, 0x1b, 0x2c, 0x3d, 0x4e, 0x5f}; byte[] bytes = {0x0a, 0x1b, 0x2c, 0x3d, 0x4e, 0x5f};
@ -79,4 +80,44 @@ public class JsonUtilTest {
Assert.assertEquals(1, jsonArray.size()); Assert.assertEquals(1, jsonArray.size());
} }
@Test
public void testTruncateUsingStringWithValueSizeLimit() {
String prefix = "12345";
int valueSizeLimit = prefix.length();
String remaining = "remaining";
String truncated = (String) JsonUtil.truncate(prefix + remaining, valueSizeLimit);
String expected = prefix + ", + " + String.valueOf(remaining.length()) + " more";
Assert.assertEquals(expected, truncated);
}
@Test
public void testTruncateUsingStringWithoutValueSizeLimit() {
String input = "testTruncateUsingStringWithoutValueSizeLimit";
String notTruncated = (String) JsonUtil.truncate(input, -1);
Assert.assertEquals(input, notTruncated);
}
@Test
public void testTruncateStringWithValueSizeLimit() {
String prefix = "12345";
int valueSizeLimit = prefix.length();
String remaining = "remaining";
String truncated = JsonUtil.truncateString(prefix + remaining, valueSizeLimit);
String expected = prefix + ", + " + String.valueOf(remaining.length()) + " more";
Assert.assertEquals(expected, truncated);
}
@Test
public void testTruncateStringWithoutValueSizeLimit() {
String input = "testTruncateStringWithoutValueSizeLimit";
String notTruncated = JsonUtil.truncateString(input, -1);
Assert.assertEquals(input, notTruncated);
}
} }

View File

@ -881,7 +881,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
map.put(propertiesPrefix + "contentEncoding", properties.getContentEncoding().toString()); map.put(propertiesPrefix + "contentEncoding", properties.getContentEncoding().toString());
} }
if (properties.getGroupId() != null) { if (properties.getGroupId() != null) {
map.put(propertiesPrefix + "groupID", properties.getGroupId()); map.put(propertiesPrefix + "groupId", properties.getGroupId());
} }
if (properties.getGroupSequence() != null) { if (properties.getGroupSequence() != null) {
map.put(propertiesPrefix + "groupSequence", properties.getGroupSequence().intValue()); map.put(propertiesPrefix + "groupSequence", properties.getGroupSequence().intValue());
@ -916,9 +916,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
map.put(prefix + "x-opt-delivery-time", deliveryTime); map.put(prefix + "x-opt-delivery-time", deliveryTime);
} else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
long delay = ((Number) entry.getValue()).longValue(); long delay = ((Number) entry.getValue()).longValue();
map.put("x-opt-delivery-delay", delay); map.put(prefix + "x-opt-delivery-delay", delay);
} else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) { } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) {
map.put("X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue()); map.put(prefix + AMQPMessageSupport.X_OPT_INGRESS_TIME, ((Number) entry.getValue()).longValue());
} else { } else {
try { try {
map.put(prefix + key, entry.getValue()); map.put(prefix + key, entry.getValue());
@ -1878,30 +1878,25 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
} }
final Symbol contentType = properties != null ? properties.getContentType() : null; final Symbol contentType = properties != null ? properties.getContentType() : null;
final String contentTypeString = contentType != null ? contentType.toString() : null;
if (m.getBody() instanceof Data && contentType != null) { if (m.getBody() instanceof Data && contentType != null) {
if (AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(contentType)) {
if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
type = OBJECT_TYPE; type = OBJECT_TYPE;
} else if (contentType.equals(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)) { } else if (AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE_SYMBOL.equals(contentType)) {
type = BYTES_TYPE; type = BYTES_TYPE;
} else { } else {
Charset charset = getCharsetForTextualContent(contentTypeString); Charset charset = getCharsetForTextualContent(contentType.toString());
if (StandardCharsets.UTF_8.equals(charset)) { if (StandardCharsets.UTF_8.equals(charset)) {
type = TEXT_TYPE; type = TEXT_TYPE;
} }
} }
} else if (m.getBody() instanceof AmqpSequence) {
type = STREAM_TYPE;
} else if (m.getBody() instanceof AmqpValue) { } else if (m.getBody() instanceof AmqpValue) {
Object value = ((AmqpValue) m.getBody()).getValue(); Object value = ((AmqpValue) m.getBody()).getValue();
if (value instanceof String) { if (value instanceof String) {
type = TEXT_TYPE; type = TEXT_TYPE;
} else if (value instanceof Binary) { } else if (value instanceof Binary) {
if (AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(contentType)) {
if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
type = OBJECT_TYPE; type = OBJECT_TYPE;
} else { } else {
type = BYTES_TYPE; type = BYTES_TYPE;
@ -1911,7 +1906,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
} else if (value instanceof Map) { } else if (value instanceof Map) {
type = MAP_TYPE; type = MAP_TYPE;
} }
} else if (m.getBody() instanceof AmqpSequence) {
type = STREAM_TYPE;
} }
return type; return type;
} }
} }

View File

@ -66,6 +66,8 @@ public final class AMQPMessageSupport {
public static SimpleString HDR_ORIGINAL_ADDRESS_ANNOTATION = SimpleString.toSimpleString("x-opt-ORIG-ADDRESS"); public static SimpleString HDR_ORIGINAL_ADDRESS_ANNOTATION = SimpleString.toSimpleString("x-opt-ORIG-ADDRESS");
public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to"; public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to";
public static final String X_OPT_DELIVERY_TIME = "x-opt-delivery-time";
public static final String X_OPT_DELIVERY_DELAY = "x-opt-delivery-delay";
// Message Properties used to map AMQP to JMS and back // Message Properties used to map AMQP to JMS and back
/** /**
@ -85,12 +87,12 @@ public final class AMQPMessageSupport {
/** /**
* Attribute used to mark the Application defined delivery time assigned to the message * Attribute used to mark the Application defined delivery time assigned to the message
*/ */
public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time"); public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol(X_OPT_DELIVERY_TIME);
/** /**
* Attribute used to mark the Application defined delivery time assigned to the message * Attribute used to mark the Application defined delivery time assigned to the message
*/ */
public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol("x-opt-delivery-delay"); public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol(X_OPT_DELIVERY_DELAY);
/** /**
* Attribute used to mark the Application defined delivery time assigned to the message * Attribute used to mark the Application defined delivery time assigned to the message
@ -204,10 +206,12 @@ public final class AMQPMessageSupport {
public static final byte TEMP_QUEUE_TYPE = 0x02; public static final byte TEMP_QUEUE_TYPE = 0x02;
public static final byte TEMP_TOPIC_TYPE = 0x03; public static final byte TEMP_TOPIC_TYPE = 0x03;
public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
/** /**
* Content type used to mark Data sections as containing arbitrary bytes. * Content type used to mark Data sections as containing arbitrary bytes.
*/ */
public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; public static final Symbol OCTET_STREAM_CONTENT_TYPE_SYMBOL = Symbol.valueOf(OCTET_STREAM_CONTENT_TYPE);
/** /**
* Lookup and return the correct Proton Symbol instance based on the given key. * Lookup and return the correct Proton Symbol instance based on the given key.

View File

@ -39,12 +39,17 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable; import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
@ -54,6 +59,7 @@ import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.UnsignedInteger;
@ -86,12 +92,26 @@ public class AMQPMessageTest {
private static final String TEST_MESSAGE_ANNOTATION_KEY = "x-opt-test-annotation"; private static final String TEST_MESSAGE_ANNOTATION_KEY = "x-opt-test-annotation";
private static final String TEST_MESSAGE_ANNOTATION_VALUE = "test-annotation"; private static final String TEST_MESSAGE_ANNOTATION_VALUE = "test-annotation";
private static final String TEST_MESSAGE_ANNOTATION_KEY2 = "x-opt-test-annotation2";
private static final String TEST_MESSAGE_ANNOTATION_VALUE2 = "test-annotation2";
private static final String TEST_EXTRA_PROPERTY_KEY1 = "extraPropertyKey1";
private static final String TEST_EXTRA_PROPERTY_VALUE1 = "extraPropertyValue1";
private static final String TEST_EXTRA_PROPERTY_KEY2 = "extraPropertyKey2";
private static final String TEST_EXTRA_PROPERTY_VALUE2 = "extraPropertyValue2";
private static final String TEST_APPLICATION_PROPERTY_KEY = "key-1"; private static final String TEST_APPLICATION_PROPERTY_KEY = "key-1";
private static final String TEST_APPLICATION_PROPERTY_VALUE = "value-1"; private static final String TEST_APPLICATION_PROPERTY_VALUE = "value-1";
private static final String TEST_APPLICATION_PROPERTY_KEY2 = "key-2";
private static final String TEST_APPLICATION_PROPERTY_VALUE2 = "value-2";
private static final String TEST_STRING_BODY = "test-string-body"; private static final String TEST_STRING_BODY = "test-string-body";
private static final String PROPERTY_MAP_APP_PROPERTIES_PREFIX = "applicationProperties.";
private static final String PROPERTY_MAP_PROPERTIES_PREFIX = "properties.";
private static final String PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX = "messageAnnotations.";
private static final String PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX = "extraProperties.";
private byte[] encodedProtonMessage; private byte[] encodedProtonMessage;
@Before @Before
@ -2120,6 +2140,388 @@ public class AMQPMessageTest {
assertEquals(annotationValue2, msgFromSendBuffer2.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey2))); assertEquals(annotationValue2, msgFromSendBuffer2.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey2)));
} }
//----- CompositeData handling -------------------------------------------//
@Test
public void testToCompositeDataHeaderSectionDurable() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
// With section missing (defaults false)
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
Object durableObj = cd.get(CompositeDataConstants.DURABLE);
assertTrue(durableObj instanceof Boolean);
assertEquals(Boolean.FALSE, durableObj);
// With section present, but value not set (defaults false)
Header protonHeader = new Header();
protonMessage.setHeader(protonHeader);
decoded = encodeAndDecodeMessage(protonMessage);
cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
durableObj = cd.get(CompositeDataConstants.DURABLE);
assertTrue(durableObj instanceof Boolean);
assertEquals(Boolean.FALSE, durableObj);
// With section present, value set False explicitly
protonHeader = new Header();
protonHeader.setDurable(Boolean.FALSE);
protonMessage.setHeader(protonHeader);
decoded = encodeAndDecodeMessage(protonMessage);
cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
durableObj = cd.get(CompositeDataConstants.DURABLE);
assertTrue(durableObj instanceof Boolean);
assertEquals(Boolean.FALSE, durableObj);
// With section present, value set True explicitly
protonHeader = new Header();
protonHeader.setDurable(Boolean.TRUE);
protonMessage.setHeader(protonHeader);
decoded = encodeAndDecodeMessage(protonMessage);
cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
durableObj = cd.get(CompositeDataConstants.DURABLE);
assertTrue(durableObj instanceof Boolean);
assertEquals(Boolean.TRUE, durableObj);
}
@Test
public void testToCompositeDataHeaderSectionPriority() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
// With section missing (defaults 4)
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY));
Object priorityObj = cd.get(CompositeDataConstants.PRIORITY);
assertTrue(priorityObj instanceof Byte);
assertEquals(Byte.valueOf((byte) 4), priorityObj);
// With section present, but value not set (defaults 4)
Header protonHeader = new Header();
protonMessage.setHeader(protonHeader);
decoded = encodeAndDecodeMessage(protonMessage);
cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY));
priorityObj = cd.get(CompositeDataConstants.PRIORITY);
assertTrue(priorityObj instanceof Byte);
assertEquals(Byte.valueOf((byte) 4), priorityObj);
// With section present, value set to 5 explicitly
protonHeader = new Header();
protonHeader.setPriority(UnsignedByte.valueOf((byte) 5));
protonMessage.setHeader(protonHeader);
decoded = encodeAndDecodeMessage(protonMessage);
cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY));
priorityObj = cd.get(CompositeDataConstants.PRIORITY);
assertTrue(priorityObj instanceof Byte);
assertEquals(Byte.valueOf((byte) 5), priorityObj);
}
@Test
public void testToCompositeDataPropertiesSection() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
String testContentEncoding = "gzip";
String testGroupId = "testGroupId";
int testGroupSequence = 45678;
String testReplyToGroupId = "testReplyToGroupId";
long testCreationTime = System.currentTimeMillis();
long testExpiryTime = testCreationTime + 5000;
String testSubject = "testSubject";
String testMessageId = "testMessageId";
Properties protonProperties = new Properties();
protonProperties.setContentType(Symbol.valueOf(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE));
protonProperties.setContentEncoding(Symbol.valueOf(testContentEncoding));
protonProperties.setGroupId(testGroupId);
protonProperties.setGroupSequence(UnsignedInteger.valueOf(testGroupSequence));
protonProperties.setReplyToGroupId(testReplyToGroupId);
protonProperties.setCreationTime(new Date(testCreationTime));
protonProperties.setAbsoluteExpiryTime(new Date(testExpiryTime));
protonProperties.setSubject(testSubject);
protonProperties.setTo(TEST_TO_ADDRESS);
protonProperties.setMessageId(testMessageId);
protonMessage.setProperties(protonProperties);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
assertTrue(propsObject instanceof String);
String properties = (String) propsObject;
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "contentType" + "=" + AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "contentEncoding" + "=" + testContentEncoding));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "groupId" + "=" + testGroupId));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "groupSequence" + "=" + testGroupSequence));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "replyToGroupId" + "=" + testReplyToGroupId));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "creationTime" + "=" + testCreationTime));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "absoluteExpiryTime" + "=" + testExpiryTime));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "to" + "=" + TEST_TO_ADDRESS));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "subject" + "=" + testSubject));
// TODO: should these fields be included in the 'properties' string and tested above?
// Some are shown elsewhere in a way, others missing entirely. Eg:
//
// correlation-id: not included.
// message-id: included'ish, with an ID: prefix added, as the CompositeDataConstants.USER_ID.
// reply-to: not included, though the replyToGroupId is given as shown above.
// user-id: not included.
// Some fields of the properties section already align with fields given
// their own top level entries of the CompositeData, which remain:
// The message-id is presented via the 'user id' field, inc an added prefix.
assertTrue(cd.containsKey(CompositeDataConstants.USER_ID));
Object messageIdObj = cd.get(CompositeDataConstants.USER_ID);
assertTrue(messageIdObj instanceof String);
assertEquals(AMQPMessageIdHelper.JMS_ID_PREFIX + testMessageId, messageIdObj);
// The creation-time is duplicated as the 'timestamp' field
assertTrue(cd.containsKey(CompositeDataConstants.TIMESTAMP));
Object timestampObj = cd.get(CompositeDataConstants.TIMESTAMP);
assertTrue(timestampObj instanceof Long);
assertEquals(testCreationTime, timestampObj);
}
@Test
public void testToCompositeDataApplicationPropertiesSection() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
Map<String, Object> appPropsMap = new HashMap<>();
appPropsMap.put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
appPropsMap.put(TEST_APPLICATION_PROPERTY_KEY2, TEST_APPLICATION_PROPERTY_VALUE2);
ApplicationProperties appProps = new ApplicationProperties(appPropsMap);
protonMessage.setApplicationProperties(appProps);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
assertTrue(propsObject instanceof String);
String properties = (String) propsObject;
assertTrue(properties.contains(PROPERTY_MAP_APP_PROPERTIES_PREFIX +
TEST_APPLICATION_PROPERTY_KEY + "=" + TEST_APPLICATION_PROPERTY_VALUE));
assertTrue(properties.contains(PROPERTY_MAP_APP_PROPERTIES_PREFIX +
TEST_APPLICATION_PROPERTY_KEY2 + "=" + TEST_APPLICATION_PROPERTY_VALUE2));
}
@Test
public void testToCompositeDataMessageAnnotationSection() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
Map<Symbol, Object> annotationsMap = new HashMap<>();
annotationsMap.put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
annotationsMap.put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY2), TEST_MESSAGE_ANNOTATION_VALUE2);
MessageAnnotations annotations = new MessageAnnotations(annotationsMap);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
assertTrue(propsObject instanceof String);
String properties = (String) propsObject;
assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
TEST_MESSAGE_ANNOTATION_KEY + "=" + TEST_MESSAGE_ANNOTATION_VALUE));
assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
TEST_MESSAGE_ANNOTATION_KEY2 + "=" + TEST_MESSAGE_ANNOTATION_VALUE2));
// Now try some specific annotations with their own handling
long testIngressTime = System.currentTimeMillis();
long testDeliveryTime = System.currentTimeMillis() + 5678;
long testDeliveryDelay = 6789;
annotationsMap = new HashMap<>();
annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_INGRESS_TIME), testIngressTime);
annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_DELIVERY_TIME), testDeliveryTime);
annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_DELIVERY_DELAY), testDeliveryDelay);
annotations = new MessageAnnotations(annotationsMap);
protonMessage.setMessageAnnotations(annotations);
decoded = encodeAndDecodeMessage(protonMessage);
cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
propsObject = cd.get(CompositeDataConstants.PROPERTIES);
assertTrue(propsObject instanceof String);
properties = (String) propsObject;
assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
AMQPMessageSupport.X_OPT_INGRESS_TIME + "=" + testIngressTime));
assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
AMQPMessageSupport.X_OPT_DELIVERY_TIME + "=" + testDeliveryTime));
assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
AMQPMessageSupport.X_OPT_DELIVERY_DELAY + "=" + testDeliveryDelay));
}
@Test
public void testToCompositeDataExtraProperties() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
TypedProperties extraProperties = new TypedProperties();
extraProperties.putProperty(new SimpleString(TEST_EXTRA_PROPERTY_KEY1), TEST_EXTRA_PROPERTY_VALUE1);
extraProperties.putProperty(new SimpleString(TEST_EXTRA_PROPERTY_KEY2), TEST_EXTRA_PROPERTY_VALUE2);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage, extraProperties);
CompositeData cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
assertTrue(propsObject instanceof String);
String properties = (String) propsObject;
assertTrue(properties.contains(PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX +
TEST_EXTRA_PROPERTY_KEY1 + "=" + TEST_EXTRA_PROPERTY_VALUE1));
assertTrue(properties.contains(PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX +
TEST_EXTRA_PROPERTY_KEY2 + "=" + TEST_EXTRA_PROPERTY_VALUE2));
}
@Test
public void testToCompositeDataWithDataBodySectionWithoutContentType() throws Exception {
doToCompositeDataWithDataBodySectionTestImpl(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE);
}
@Test
public void testToCompositeDataWithDataBodySectionWithOctetStreamContentType() throws Exception {
doToCompositeDataWithDataBodySectionTestImpl(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE);
}
@Test
public void testToCompositeDataWithDataBodySectionWithTextPlainContentType() throws Exception {
doToCompositeDataWithDataBodySectionTestImpl("text/plain", org.apache.activemq.artemis.api.core.Message.TEXT_TYPE);
}
@Test
public void testToCompositeDataWithDataBodySectionWithSerializedObjectContentType() throws Exception {
doToCompositeDataWithDataBodySectionTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE);
}
private void doToCompositeDataWithDataBodySectionTestImpl(String contentType, byte expectedMessageType) throws OpenDataException {
Message protonMessage = Message.Factory.create();
// Not the right payload for some of the content types,but it
// doesnt matter, mainly checking type value and for lack of NPEs.
String bytesSource = "testPayloadBytes";
String expectedBodyText = "Data{" + bytesSource + "}";
Data body = new Data(new Binary(bytesSource.getBytes(StandardCharsets.UTF_8)));
protonMessage.setBody(body);
protonMessage.setContentType(contentType);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY));
assertEquals(expectedBodyText, cd.get(CompositeDataConstants.TEXT_BODY));
assertTrue(cd.containsKey(CompositeDataConstants.TYPE));
assertEquals(expectedMessageType, cd.get(CompositeDataConstants.TYPE));
}
@Test
public void testToCompositeDataWithAmqpValueBinaryBodySectionWithoutContentType() throws Exception {
doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE);
}
@Test
public void testToCompositeDataWithAmqpValueBinaryBodySectionWithSerializedObjectContentType() throws Exception {
// Shouldnt really get in this situation, not meant to use content-type without the Data body section.
doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE);
}
private void doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(String contentType, byte expectedMessageType) throws OpenDataException {
Message protonMessage = Message.Factory.create();
// Not the right payload for some of the content types,but it
// doesnt matter, mainly checking type value and for lack of NPEs.
String bytesSource = "testPayloadBytes";
AmqpValue body = new AmqpValue(new Binary(bytesSource.getBytes(StandardCharsets.UTF_8)));
protonMessage.setBody(body);
protonMessage.setContentType(contentType);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY));
assertEquals(bytesSource, cd.get(CompositeDataConstants.TEXT_BODY));
assertTrue(cd.containsKey(CompositeDataConstants.TYPE));
assertEquals(expectedMessageType, cd.get(CompositeDataConstants.TYPE));
}
@Test
public void testToCompositeDataWithStringBodyWithoutValueSizeLimit() throws Exception {
doToCompositeDataWithStringBodyValueSizeLimitTestImpl(-1, TEST_STRING_BODY);
}
@Test
public void testToCompositeDataWithStringBodyWithValueSizeLimit() throws Exception {
int limit = 11;
int testBodyLength = TEST_STRING_BODY.length();
assertTrue(testBodyLength > limit);
String expected = TEST_STRING_BODY.substring(0, limit) + ", + " + String.valueOf(testBodyLength - limit) + " more";
doToCompositeDataWithStringBodyValueSizeLimitTestImpl(limit, expected);
}
private void doToCompositeDataWithStringBodyValueSizeLimitTestImpl(int fieldsLimit, String expectedBodyText) throws OpenDataException {
Message protonMessage = Message.Factory.create();
protonMessage.setBody(new AmqpValue(TEST_STRING_BODY));
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(fieldsLimit, 0);
assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY));
assertEquals(expectedBodyText, cd.get(CompositeDataConstants.TEXT_BODY));
assertTrue(cd.containsKey(CompositeDataConstants.TYPE));
assertEquals(org.apache.activemq.artemis.api.core.Message.TEXT_TYPE, cd.get(CompositeDataConstants.TYPE));
}
//----- Test Support ------------------------------------------------------// //----- Test Support ------------------------------------------------------//
private MessageImpl createProtonMessage() { private MessageImpl createProtonMessage() {
@ -2483,13 +2885,17 @@ public class AMQPMessageTest {
return bytes; return bytes;
} }
private AMQPStandardMessage encodeAndDecodeMessage(MessageImpl message) { private AMQPStandardMessage encodeAndDecodeMessage(Message message) {
return encodeAndDecodeMessage(message, null);
}
private AMQPStandardMessage encodeAndDecodeMessage(Message message, TypedProperties extraProperties) {
ByteBuf nettyBuffer = Unpooled.buffer(1500); ByteBuf nettyBuffer = Unpooled.buffer(1500);
message.encode(new NettyWritable(nettyBuffer)); message.encode(new NettyWritable(nettyBuffer));
byte[] bytes = new byte[nettyBuffer.writerIndex()]; byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes); nettyBuffer.readBytes(bytes);
return new AMQPStandardMessage(0, bytes, null); return new AMQPStandardMessage(0, bytes, extraProperties);
} }
} }