ARTEMIS-2096 Refactor AMQMessage abstraction

Major refactoring of the AMQPMessage abstraction to resolve
some issue of message corruption still present in the code and
improve the API handling of message changes and re-encoding.

Improves handling of decoding of message sections limiting the
work to only the portions needed and ensuring the state data
is always updated with what has been done.  Fixes issues of
corrupt state on copy of message or other changes in filters.
This commit is contained in:
Timothy Bish 2018-09-25 12:22:19 -04:00 committed by Clebert Suconic
parent 2453978f41
commit a851a8f93f
16 changed files with 3581 additions and 1452 deletions

View File

@ -50,7 +50,6 @@ public class AMQPMessagePersister extends MessagePersister {
SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG;
}
/** Sub classes must add the first short as the protocol-id */
@Override
public void encode(ActiveMQBuffer buffer, Message record) {
@ -62,7 +61,6 @@ public class AMQPMessagePersister extends MessagePersister {
record.persist(buffer);
}
@Override
public Message decode(ActiveMQBuffer buffer, Message record) {
long id = buffer.readLong();
@ -76,5 +74,4 @@ public class AMQPMessagePersister extends MessagePersister {
}
return record;
}
}

View File

@ -63,9 +63,22 @@ public final class AMQPMessageSupport {
/**
* Attribute used to mark the Application defined delivery time assigned to the message
*
* @deprecated Use the SCHEDULED_DELIVERY_TIME value as this is not JMS specific and will be removed.
*/
@Deprecated
public static final Symbol JMS_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");
/**
* Attribute used to mark the Application defined delivery time assigned to the message
*/
public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");
/**
* Attribute used to mark the Application defined delivery time assigned to the message
*/
public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol("x-opt-delivery-delay");
/**
* Attribute used to mark the Application defined delivery time assigned to the message
*/
@ -226,6 +239,24 @@ public final class AMQPMessageSupport {
}
}
/**
* Check whether the content-type given matches the expect value.
*
* @param expected
* content type string to compare against or null if not expected to be set
* @param actual
* the AMQP content type symbol from the Properties section
*
* @return true if content type matches
*/
public static boolean isContentType(String expected, Symbol actual) {
if (expected == null) {
return actual == null;
} else {
return expected.equals(actual != null ? actual.toString() : actual);
}
}
/**
* @param contentType
* the contentType of the received message

View File

@ -98,23 +98,29 @@ import io.netty.buffer.PooledByteBufAllocator;
* */
public class AmqpCoreConverter {
@SuppressWarnings("unchecked")
public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
return message.toCore(coreMessageObjectPools);
}
@SuppressWarnings("unchecked")
public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools, Header header, MessageAnnotations annotations, Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) throws Exception {
final long messageId = message.getMessageID();
final Symbol contentType = properties != null ? properties.getContentType() : null;
final String contentTypeString = contentType != null ? contentType.toString() : null;
Section body = message.getProtonMessage().getBody();
ServerJMSMessage result;
if (body == null) {
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
result = createObjectMessage(message.getMessageID(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) {
result = createBytesMessage(message.getMessageID(), coreMessageObjectPools);
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType) || isContentType(null, contentType)) {
result = createBytesMessage(messageId, coreMessageObjectPools);
} else {
Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
Charset charset = getCharsetForTextualContent(contentTypeString);
if (charset != null) {
result = createTextMessage(message.getMessageID(), coreMessageObjectPools);
result = createTextMessage(messageId, coreMessageObjectPools);
} else {
result = createMessage(message.getMessageID(), coreMessageObjectPools);
result = createMessage(messageId, coreMessageObjectPools);
}
}
@ -122,30 +128,30 @@ public class AmqpCoreConverter {
} else if (body instanceof Data) {
Binary payload = ((Data) body).getValue();
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType)) {
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else {
Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
Charset charset = getCharsetForTextualContent(contentTypeString);
if (StandardCharsets.UTF_8.equals(charset)) {
ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
try {
CharBuffer chars = charset.newDecoder().decode(buf);
result = createTextMessage(message.getMessageID(), String.valueOf(chars), coreMessageObjectPools);
result = createTextMessage(messageId, String.valueOf(chars), coreMessageObjectPools);
} catch (CharacterCodingException e) {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
} else {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
}
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
} else if (body instanceof AmqpSequence) {
AmqpSequence sequence = (AmqpSequence) body;
ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
ServerJMSStreamMessage m = createStreamMessage(messageId, coreMessageObjectPools);
for (Object item : sequence.getValue()) {
m.writeObject(item);
}
@ -155,35 +161,35 @@ public class AmqpCoreConverter {
} else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue();
if (value == null || value instanceof String) {
result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools);
result = createTextMessage(messageId, (String) value, coreMessageObjectPools);
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
} else if (value instanceof Binary) {
Binary payload = (Binary) value;
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
result = createObjectMessage(message.getMessageID(), payload, coreMessageObjectPools);
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, payload, coreMessageObjectPools);
} else {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
} else if (value instanceof List) {
ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
ServerJMSStreamMessage m = createStreamMessage(messageId, coreMessageObjectPools);
for (Object item : (List<Object>) value) {
m.writeObject(item);
}
result = m;
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
} else if (value instanceof Map) {
result = createMapMessage(message.getMessageID(), (Map<String, Object>) value, coreMessageObjectPools);
result = createMapMessage(messageId, (Map<String, Object>) value, coreMessageObjectPools);
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
} else {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
try {
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
TLSEncode.getEncoder().writeObject(body);
result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
result = createBytesMessage(messageId, buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
} finally {
buf.release();
TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
@ -193,30 +199,38 @@ public class AmqpCoreConverter {
throw new RuntimeException("Unexpected body type: " + body.getClass());
}
TypedProperties properties = message.getExtraProperties();
if (properties != null) {
for (SimpleString str : properties.getPropertyNames()) {
if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
continue;
}
result.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
processHeader(result, header);
processMessageAnnotations(result, annotations);
processApplicationProperties(result, applicationProperties);
processProperties(result, properties);
processFooter(result, footer);
processExtraProperties(result, message.getExtraProperties());
// If the JMS expiration has not yet been set...
if (header != null && result.getJMSExpiration() == 0) {
// Then lets try to set it based on the message TTL.
long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
if (header.getTtl() != null) {
ttl = header.getTtl().longValue();
}
if (ttl == 0) {
result.setJMSExpiration(0);
} else {
result.setJMSExpiration(System.currentTimeMillis() + ttl);
}
}
populateMessage(result, message.getProtonMessage());
result.getInnerMessage().setReplyTo(message.getReplyTo());
result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority());
result.getInnerMessage().setAddress(message.getAddressSimpleString());
result.encode();
return result != null ? result.getInnerMessage() : null;
return result.getInnerMessage();
}
@SuppressWarnings("unchecked")
protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
Header header = amqp.getHeader();
protected static ServerJMSMessage processHeader(ServerJMSMessage jms, Header header) throws Exception {
if (header != null) {
jms.setBooleanProperty(JMS_AMQP_HEADER, true);
@ -248,9 +262,12 @@ public class AmqpCoreConverter {
jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
final MessageAnnotations ma = amqp.getMessageAnnotations();
if (ma != null) {
for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
return jms;
}
protected static ServerJMSMessage processMessageAnnotations(ServerJMSMessage jms, MessageAnnotations annotations) throws Exception {
if (annotations != null && annotations.getValue() != null) {
for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
String key = entry.getKey().toString();
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
long deliveryTime = ((Number) entry.getValue()).longValue();
@ -266,14 +283,33 @@ public class AmqpCoreConverter {
}
}
final ApplicationProperties ap = amqp.getApplicationProperties();
if (ap != null) {
for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) ap.getValue().entrySet()) {
return jms;
}
private static ServerJMSMessage processApplicationProperties(ServerJMSMessage jms, ApplicationProperties properties) throws Exception {
if (properties != null && properties.getValue() != null) {
for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) properties.getValue().entrySet()) {
setProperty(jms, entry.getKey(), entry.getValue());
}
}
final Properties properties = amqp.getProperties();
return jms;
}
private static ServerJMSMessage processExtraProperties(ServerJMSMessage jms, TypedProperties properties) {
if (properties != null) {
for (SimpleString str : properties.getPropertyNames()) {
if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
continue;
}
jms.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
}
}
return jms;
}
private static ServerJMSMessage processProperties(ServerJMSMessage jms, Properties properties) throws Exception {
if (properties != null) {
if (properties.getMessageId() != null) {
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
@ -317,24 +353,13 @@ public class AmqpCoreConverter {
}
}
// If the jms expiration has not yet been set...
if (header != null && jms.getJMSExpiration() == 0) {
// Then lets try to set it based on the message ttl.
long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
if (header.getTtl() != null) {
ttl = header.getTtl().longValue();
}
return jms;
}
if (ttl == 0) {
jms.setJMSExpiration(0);
} else {
jms.setJMSExpiration(System.currentTimeMillis() + ttl);
}
}
final Footer fp = amqp.getFooter();
if (fp != null) {
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
@SuppressWarnings("unchecked")
private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer footer) throws Exception {
if (footer != null && footer.getValue() != null) {
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) footer.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
}

View File

@ -117,11 +117,9 @@ public class NettyWritable implements WritableBuffer {
@Override
public void put(ReadableBuffer buffer) {
if (buffer.hasArray()) {
nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset(), buffer.remaining());
nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
} else {
while (buffer.hasRemaining()) {
nettyBuffer.writeByte(buffer.get());
}
nettyBuffer.writeBytes(buffer.byteBuffer());
}
}
}

View File

@ -29,6 +29,8 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessa
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@ -40,6 +42,8 @@ import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
import io.netty.buffer.Unpooled;
public class TestConversions extends Assert {
@ -58,12 +62,11 @@ public class TestConversions extends Assert {
message.setBody(new AmqpValue(new Boolean(true)));
AMQPMessage encodedMessage = new AMQPMessage(message);
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
verifyProperties(ServerJMSMessage.wrapCoreMessage(serverMessage));
}
@Test
@ -81,7 +84,7 @@ public class TestConversions extends Assert {
message.setBody(new Data(new Binary(bodyBytes)));
AMQPMessage encodedMessage = new AMQPMessage(message);
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
@ -96,7 +99,6 @@ public class TestConversions extends Assert {
bytesMessage.readBytes(newBodyBytes);
Assert.assertArrayEquals(bodyBytes, newBodyBytes);
}
private void verifyProperties(javax.jms.Message message) throws Exception {
@ -135,7 +137,7 @@ public class TestConversions extends Assert {
message.setBody(new AmqpValue(mapValues));
AMQPMessage encodedMessage = new AMQPMessage(message);
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
serverMessage.getReadOnlyBodyBuffer();
@ -145,11 +147,11 @@ public class TestConversions extends Assert {
verifyProperties(mapMessage);
Assert.assertEquals(1, mapMessage.getInt("someint"));
Assert.assertEquals("value", mapMessage.getString("somestr"));
assertEquals(1, mapMessage.getInt("someint"));
assertEquals("value", mapMessage.getString("somestr"));
AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage());
System.out.println(newAMQP.getProtonMessage().getBody());
assertNotNull(newAMQP.getBody());
}
@Test
@ -165,7 +167,7 @@ public class TestConversions extends Assert {
message.setBody(new AmqpSequence(objects));
AMQPMessage encodedMessage = new AMQPMessage(message);
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
@ -189,7 +191,7 @@ public class TestConversions extends Assert {
String text = "someText";
message.setBody(new AmqpValue(text));
AMQPMessage encodedMessage = new AMQPMessage(message);
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
@ -198,8 +200,7 @@ public class TestConversions extends Assert {
verifyProperties(textMessage);
Assert.assertEquals(text, textMessage.getText());
assertEquals(text, textMessage.getText());
}
@Test
@ -209,7 +210,7 @@ public class TestConversions extends Assert {
String text = "someText";
message.setBody(new AmqpValue(text));
AMQPMessage encodedMessage = new AMQPMessage(message);
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
TypedProperties extraProperties = createTypedPropertiesMap();
extraProperties.putBytesProperty(new SimpleString("bytesProp"), "value".getBytes());
encodedMessage.setExtraProperties(extraProperties);
@ -222,8 +223,15 @@ public class TestConversions extends Assert {
verifyProperties(textMessage);
assertEquals("value", new String(((byte[]) textMessage.getObjectProperty("bytesProp"))));
Assert.assertEquals(text, textMessage.getText());
assertEquals(text, textMessage.getText());
}
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
message.encode(encoded);
NettyReadable readable = new NettyReadable(encoded.getByteBuf());
return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
}
}

View File

@ -16,12 +16,11 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -30,6 +29,13 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
@ -39,7 +45,8 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
import org.apache.qpid.proton.Proton;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@ -47,13 +54,11 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.Unpooled;
public class JMSMappingInboundTransformerTest {
@ -72,10 +77,10 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception {
Message message = Message.Factory.create();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
AMQPMessage messageEncode = new AMQPMessage(message);
AMQPMessage messageEncode = encodeAndCreateAMQPMessage(message);
ICoreMessage coreMessage = messageEncode.toCore();
@ -94,9 +99,9 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception {
Message message = Message.Factory.create();
MessageImpl message = (MessageImpl) Message.Factory.create();
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@ -104,10 +109,10 @@ public class JMSMappingInboundTransformerTest {
@Test
public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception {
Message message = Message.Factory.create();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setContentType("text/plain");
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@ -125,12 +130,13 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromDataWithEmptyBinaryAndContentType() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
AMQPMessage amqp = encodeAndCreateAMQPMessage(message);
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(amqp.toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@ -145,12 +151,12 @@ public class JMSMappingInboundTransformerTest {
* if an error occurs during the test.
*/
public void testCreateBytesMessageFromDataWithUnknownContentType() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType("unknown-content-type");
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@ -165,13 +171,13 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromDataWithEmptyBinaryAndNoContentType() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
assertNull(message.getContentType());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@ -187,12 +193,12 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateObjectMessageFromDataWithContentTypeAndEmptyBinary() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@ -287,12 +293,12 @@ public class JMSMappingInboundTransformerTest {
}
private void doCreateTextMessageFromDataWithContentTypeTestImpl(String contentType, Charset expectedCharset) throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(contentType);
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
if (StandardCharsets.UTF_8.equals(expectedCharset)) {
@ -313,10 +319,10 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateTextMessageFromAmqpValueWithString() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue("content"));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@ -331,10 +337,10 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateTextMessageFromAmqpValueWithNull() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue(null));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@ -350,11 +356,11 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception {
Message message = Message.Factory.create();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue(new Binary(new byte[0])));
message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@ -369,11 +375,11 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateAmqpMapMessageFromAmqpValueWithMap() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Map<String, String> map = new HashMap<>();
message.setBody(new AmqpValue(map));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSMapMessage.class, jmsMessage.getClass());
@ -388,11 +394,11 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateAmqpStreamMessageFromAmqpValueWithList() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
List<String> list = new ArrayList<>();
message.setBody(new AmqpValue(list));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
@ -407,11 +413,11 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateAmqpStreamMessageFromAmqpSequence() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
List<String> list = new ArrayList<>();
message.setBody(new AmqpSequence(list));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
@ -426,11 +432,11 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateAmqpBytesMessageFromAmqpValueWithBinary() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Binary binary = new Binary(new byte[0]);
message.setBody(new AmqpValue(binary));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@ -446,10 +452,10 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromAmqpValueWithUncategorisedContent() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue(UUID.randomUUID()));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@ -458,10 +464,10 @@ public class JMSMappingInboundTransformerTest {
@Test
public void testTransformMessageWithAmqpValueStringCreatesTextMessage() throws Exception {
String contentString = "myTextMessageContent";
Message message = Message.Factory.create();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue(contentString));
ServerJMSTextMessage jmsMessage = (ServerJMSTextMessage)ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
ServerJMSTextMessage jmsMessage = (ServerJMSTextMessage)ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
jmsMessage.decode();
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
@ -504,17 +510,17 @@ public class JMSMappingInboundTransformerTest {
throws Exception {
String toAddress = "toAddress";
Message amqp = Message.Factory.create();
amqp.setBody(new AmqpValue("myTextMessageContent"));
amqp.setAddress(toAddress);
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue("myTextMessageContent"));
message.setAddress(toAddress);
if (toTypeAnnotationValue != null) {
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf("x-opt-to-type"), toTypeAnnotationValue);
MessageAnnotations ma = new MessageAnnotations(map);
amqp.setMessageAnnotations(ma);
message.setMessageAnnotations(ma);
}
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
}
@ -549,18 +555,26 @@ public class JMSMappingInboundTransformerTest {
throws Exception {
String replyToAddress = "replyToAddress";
Message amqp = Message.Factory.create();
amqp.setBody(new AmqpValue("myTextMessageContent"));
amqp.setReplyTo(replyToAddress);
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue("myTextMessageContent"));
message.setReplyTo(replyToAddress);
if (replyToTypeAnnotationValue != null) {
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf("x-opt-reply-type"), replyToTypeAnnotationValue);
MessageAnnotations ma = new MessageAnnotations(map);
amqp.setMessageAnnotations(ma);
message.setMessageAnnotations(ma);
}
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
}
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
message.encode(encoded);
NettyReadable readable = new NettyReadable(encoded.getByteBuf());
return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
}
}

View File

@ -44,6 +44,7 @@ import javax.jms.JMSException;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
@ -59,7 +60,6 @@ import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
public class JMSMappingOutboundTransformerTest {
@ -79,7 +79,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSMessage outbound = createMessage();
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNull(amqp.getBody());
}
@ -90,7 +90,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNull(amqp.getBody());
}
@ -104,7 +104,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeBytes(expectedPayload);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -123,7 +123,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -139,7 +139,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeBytes(expectedPayload);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -159,7 +159,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSMapMessage outbound = createMapMessage();
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -174,7 +174,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setBytes("bytes", byteArray);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -196,7 +196,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setBoolean("property-3", true);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -218,7 +218,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeString("test");
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -239,7 +239,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeString("test");
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -260,7 +260,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeString("test");
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpSequence);
@ -279,7 +279,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSObjectMessage outbound = createObjectMessage();
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -292,7 +292,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -304,7 +304,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -321,7 +321,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -338,7 +338,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -356,7 +356,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -371,7 +371,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage();
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -384,7 +384,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage(contentString);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -397,7 +397,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage(contentString);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -410,7 +410,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage(contentString);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -427,7 +427,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -445,7 +445,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -473,7 +473,7 @@ public class JMSMappingOutboundTransformerTest {
textMessage.setText("myTextMessageContent");
textMessage.setJMSDestination(jmsDestination);
Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage());
MessageAnnotations ma = amqp.getMessageAnnotations();
Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
@ -506,7 +506,7 @@ public class JMSMappingOutboundTransformerTest {
textMessage.setText("myTextMessageContent");
textMessage.setJMSReplyTo(jmsReplyTo);
Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage());
MessageAnnotations ma = amqp.getMessageAnnotations();
Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
@ -518,7 +518,7 @@ public class JMSMappingOutboundTransformerTest {
}
if (jmsReplyTo != null) {
assertEquals("Unexpected 'reply-to' address", jmsReplyTo.getAddress(), amqp.getReplyTo());
assertEquals("Unexpected 'reply-to' address", jmsReplyTo.getSimpleAddress(), amqp.getReplyTo());
}
}

View File

@ -21,23 +21,27 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
/**
* Some simple performance tests for the Message Transformers.
*/
@ -56,10 +60,9 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testBodyOnlyMessage() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Proton.message();
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
AMQPMessage encoded = new AMQPMessage(message);
AMQPMessage encoded = encodeAndCreateAMQPMessage(message);
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
@ -81,8 +84,7 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testMessageWithNoPropertiesOrAnnotations() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Proton.message();
message.setAddress("queue://test-queue");
message.setDeliveryCount(1);
@ -90,7 +92,7 @@ public class JMSTransformationSpeedComparisonTest {
message.setContentType("text/plain");
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
AMQPMessage encoded = new AMQPMessage(message);
AMQPMessage encoded = encodeAndCreateAMQPMessage(message);
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
@ -112,8 +114,7 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testTypicalQpidJMSMessage() throws Exception {
AMQPMessage encoded = new AMQPMessage(createTypicalQpidJMSMessage());
AMQPMessage encoded = encodeAndCreateAMQPMessage(createTypicalQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
@ -136,7 +137,7 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testComplexQpidJMSMessage() throws Exception {
AMQPMessage encoded = encode(createComplexQpidJMSMessage());
AMQPMessage encoded = encodeAndCreateAMQPMessage(createComplexQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
@ -159,7 +160,7 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testTypicalQpidJMSMessageInBoundOnly() throws Exception {
AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
AMQPMessage encoded = encodeAndCreateAMQPMessage(createTypicalQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
@ -182,8 +183,7 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception {
AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
AMQPMessage encoded = encodeAndCreateAMQPMessage(createTypicalQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
@ -204,7 +204,7 @@ public class JMSTransformationSpeedComparisonTest {
LOG_RESULTS(totalDuration);
}
private Message createTypicalQpidJMSMessage() {
private MessageImpl createTypicalQpidJMSMessage() {
Map<String, Object> applicationProperties = new HashMap<>();
Map<Symbol, Object> messageAnnotations = new HashMap<>();
@ -215,7 +215,7 @@ public class JMSTransformationSpeedComparisonTest {
messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
Message message = Proton.message();
MessageImpl message = (MessageImpl) Proton.message();
message.setAddress("queue://test-queue");
message.setDeliveryCount(1);
@ -228,7 +228,7 @@ public class JMSTransformationSpeedComparisonTest {
return message;
}
private Message createComplexQpidJMSMessage() {
private MessageImpl createComplexQpidJMSMessage() {
Map<String, Object> applicationProperties = new HashMap<>();
Map<Symbol, Object> messageAnnotations = new HashMap<>();
@ -245,7 +245,7 @@ public class JMSTransformationSpeedComparisonTest {
messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
Message message = Proton.message();
MessageImpl message = (MessageImpl) Proton.message();
// Header Values
message.setPriority((short) 9);
@ -272,8 +272,13 @@ public class JMSTransformationSpeedComparisonTest {
return message;
}
private AMQPMessage encode(Message message) {
return new AMQPMessage(message);
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
message.encode(encoded);
NettyReadable readable = new NettyReadable(encoded.getByteBuf());
return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
}
private void encode(AMQPMessage target) {

View File

@ -16,6 +16,11 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@ -23,20 +28,20 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.junit.Before;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import io.netty.buffer.Unpooled;
/**
* Tests some basic encode / decode functionality on the transformers.
@ -46,34 +51,32 @@ public class MessageTransformationTest {
@Rule
public TestName test = new TestName();
@Before
public void setUp() {
}
@Test
public void testBodyOnlyEncodeDecode() throws Exception {
Message incomingMessage = Proton.message();
MessageImpl incomingMessage = (MessageImpl) Proton.message();
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core);
assertNull(outboudMessage.getHeader());
Section body = outboudMessage.getBody();
assertNotNull(body);
assertTrue(body instanceof AmqpValue);
assertTrue(((AmqpValue) body).getValue() instanceof String);
}
@Test
public void testPropertiesButNoHeadersEncodeDecode() throws Exception {
Message incomingMessage = Proton.message();
MessageImpl incomingMessage = (MessageImpl) Proton.message();
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
incomingMessage.setMessageId("ID:SomeQualifier:0:0:1");
ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core);
assertNull(outboudMessage.getHeader());
assertNotNull(outboudMessage.getProperties());
@ -81,20 +84,24 @@ public class MessageTransformationTest {
@Test
public void testHeaderButNoPropertiesEncodeDecode() throws Exception {
Message incomingMessage = Proton.message();
MessageImpl incomingMessage = (MessageImpl) Proton.message();
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
incomingMessage.setDurable(true);
ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core);
assertNotNull(outboudMessage.getHeader());
Section body = outboudMessage.getBody();
assertNotNull(body);
assertTrue(body instanceof AmqpValue);
assertTrue(((AmqpValue) body).getValue() instanceof String);
}
@Test
public void testComplexQpidJMSMessageEncodeDecode() throws Exception {
Map<String, Object> applicationProperties = new HashMap<>();
Map<Symbol, Object> messageAnnotations = new HashMap<>();
@ -113,7 +120,7 @@ public class MessageTransformationTest {
messageAnnotations.put(Symbol.valueOf("x-opt-jms-reply-to"), 0);
messageAnnotations.put(Symbol.valueOf("x-opt-delivery-delay"), 2000);
Message message = Proton.message();
MessageImpl message = (MessageImpl) Proton.message();
// Header Values
message.setPriority((short) 9);
@ -137,10 +144,19 @@ public class MessageTransformationTest {
message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
ICoreMessage core = new AMQPMessage(message).toCore();
Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
ICoreMessage core = encodeAndCreateAMQPMessage(message).toCore();
AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core);
assertEquals(10, outboudMessage.getApplicationProperties().getValue().size());
assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size());
}
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
message.encode(encoded);
NettyReadable readable = new NettyReadable(encoded.getByteBuf());
return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
}
}

View File

@ -1,438 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.message;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.EncodingCodes;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class AMQPMessageTest {
@Test
public void testVerySimple() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
Properties properties = new Properties();
properties.setTo("someNiceLocal");
protonMessage.setProperties(properties);
protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
protonMessage.getHeader().setDurable(Boolean.TRUE);
protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap<>()));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(7, decoded.getHeader().getDeliveryCount().intValue());
assertEquals(true, decoded.getHeader().getDurable());
assertEquals("someNiceLocal", decoded.getAddress());
}
@Test
public void testApplicationPropertiesReencode() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
Properties properties = new Properties();
properties.setTo("someNiceLocal");
protonMessage.setProperties(properties);
protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
protonMessage.getHeader().setDurable(Boolean.TRUE);
HashMap<String, Object> map = new HashMap<>();
map.put("key", "string1");
protonMessage.setApplicationProperties(new ApplicationProperties(map));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals("someNiceLocal", decoded.getAddress());
decoded.setAddress("newAddress");
decoded.reencode();
assertEquals(7, decoded.getHeader().getDeliveryCount().intValue());
assertEquals(true, decoded.getHeader().getDurable());
assertEquals("newAddress", decoded.getAddress());
assertEquals("string1", decoded.getObjectProperty("key"));
// validate if the message will be the same after delivery
AMQPMessage newDecoded = encodeDelivery(decoded, 3);
assertEquals(2, decoded.getHeader().getDeliveryCount().intValue());
assertEquals(true, newDecoded.getHeader().getDurable());
assertEquals("newAddress", newDecoded.getAddress());
assertEquals("string1", newDecoded.getObjectProperty("key"));
}
@Test
public void testGetAddressFromMessage() {
final String ADDRESS = "myQueue";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setAddress(ADDRESS);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(ADDRESS, decoded.getAddress());
}
@Test
public void testGetAddressSimpleStringFromMessage() {
final String ADDRESS = "myQueue";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setAddress(ADDRESS);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(ADDRESS, decoded.getAddressSimpleString().toString());
}
@Test
public void testGetAddressFromMessageWithNoValueSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getAddress());
assertNull(decoded.getAddressSimpleString());
}
@Test
public void testIsDurableFromMessage() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setDurable(true);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertTrue(decoded.isDurable());
}
@Test
public void testIsDurableFromMessageWithNoValueSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertFalse(decoded.isDurable());
}
@Test
public void testGetGroupIDFromMessage() {
final String GROUP_ID = "group-1";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setGroupId(GROUP_ID);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(GROUP_ID, decoded.getGroupID().toString());
}
@Test
public void testGetGroupIDFromMessageWithNoGroupId() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getUserID());
}
@Test
public void testGetUserIDFromMessage() {
final String USER_NAME = "foo";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setUserId(USER_NAME.getBytes(StandardCharsets.UTF_8));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(USER_NAME, decoded.getAMQPUserID());
}
@Test
public void testGetUserIDFromMessageWithNoUserID() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getUserID());
}
@Test
public void testGetPriorityFromMessage() {
final short PRIORITY = 7;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setPriority(PRIORITY);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(PRIORITY, decoded.getPriority());
}
@Test
public void testGetPriorityFromMessageWithNoPrioritySet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
}
@Test
public void testGetTimestampFromMessage() {
Date timestamp = new Date(System.currentTimeMillis());
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
Properties properties = new Properties();
properties.setCreationTime(timestamp);
protonMessage.setProperties(properties);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(timestamp.getTime(), decoded.getTimestamp());
}
@Test
public void testGetTimestampFromMessageWithNoCreateTimeSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0L, decoded.getTimestamp());
}
@Test
public void testExtraProperty() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
byte[] original = RandomUtil.randomBytes();
SimpleString name = SimpleString.toSimpleString("myProperty");
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
decoded.setAddress("someAddress");
decoded.setMessageID(33);
decoded.putExtraBytesProperty(name, original);
ICoreMessage coreMessage = decoded.toCore();
Assert.assertEquals(original, coreMessage.getBytesProperty(name));
ActiveMQBuffer buffer = ActiveMQBuffers.pooledBuffer(10 * 1024);
try {
decoded.getPersister().encode(buffer, decoded);
Assert.assertEquals(AMQPMessagePersisterV2.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
AMQPMessage readMessage = (AMQPMessage)decoded.getPersister().decode(buffer, null);
Assert.assertEquals(33, readMessage.getMessageID());
Assert.assertEquals("someAddress", readMessage.getAddress());
Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
} finally {
buffer.release();
}
{
ICoreMessage embeddedMessage = EmbedMessageUtil.embedAsCoreMessage(decoded);
AMQPMessage readMessage = (AMQPMessage) EmbedMessageUtil.extractEmbedded(embeddedMessage);
Assert.assertEquals(33, readMessage.getMessageID());
Assert.assertEquals("someAddress", readMessage.getAddress());
Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
}
}
private static final UnsignedLong AMQPVALUE_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000077L);
private static final UnsignedLong APPLICATION_PROPERTIES_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000074L);
private static final UnsignedLong DELIVERY_ANNOTATIONS_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000071L);
@Test
public void testPartialDecodeIgnoresDeliveryAnnotationsByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(DELIVERY_ANNOTATIONS_DESCRIPTOR.byteValue());
encodedBytes.writeByte(EncodingCodes.MAP8);
encodedBytes.writeByte(2); // Size
encodedBytes.writeByte(2); // Elements
// Use bad encoding code on underlying type of map key which will fail the decode if run
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPMessage message = null;
try {
message = new AMQPMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
try {
// This should perform the lazy decode of the DeliveryAnnotations portion of the message
message.reencode();
fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
@Test
public void testPartialDecodeIgnoresApplicationPropertiesByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(APPLICATION_PROPERTIES_DESCRIPTOR.byteValue());
// Use bad encoding code on underlying type
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPMessage message = null;
try {
message = new AMQPMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
assertTrue(message.isDurable());
try {
// This should perform the lazy decode of the ApplicationProperties portion of the message
message.getStringProperty("test");
fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
@Test
public void testPartialDecodeIgnoresBodyByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(AMQPVALUE_DESCRIPTOR.byteValue());
// Use bad encoding code on underlying type
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPMessage message = null;
try {
message = new AMQPMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
assertTrue(message.isDurable());
try {
// This will decode the body section if present in order to present it as a Proton Message object
message.getProtonMessage();
fail("Should have thrown an error when attempting to decode the body which is malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {
ByteBuf nettyBuffer = Unpooled.buffer(1500);
message.encode(new NettyWritable(nettyBuffer));
byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
return new AMQPMessage(0, bytes, null);
}
private AMQPMessage encodeDelivery(AMQPMessage message, int deliveryCount) {
ByteBuf nettyBuffer = Unpooled.buffer(1500);
message.sendBuffer(nettyBuffer, deliveryCount);
byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
return new AMQPMessage(0, bytes, null);
}
}

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.junit.Test;
@ -125,6 +126,28 @@ public class NettyWritableTest {
doPutReadableBufferTestImpl(false);
}
@Test
public void testPutReadableBufferWithOffsetAndNonZeroPosition() {
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
ByteBuffer source = ByteBuffer.allocate(20);
source.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
source.position(5);
source.limit(10);
writable.put(source);
assertEquals(5, writable.position());
assertEquals(5, buffer.readableBytes());
byte[] check = new byte[5];
buffer.readBytes(check);
assertTrue(Arrays.equals(new byte[] {5, 6, 7, 8, 9}, check));
}
private void doPutReadableBufferTestImpl(boolean readOnly) {
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put((byte) 1);

View File

@ -27,7 +27,6 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
@ -50,7 +49,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
sender.send(message);
sender.close();
assertEquals(1, queueView.getMessageCount());
Wait.assertEquals(1, queueView::getMessageCount);
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getQueueName());
@ -101,6 +100,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
server.stop();
server.start();
final Queue dlqView = getProxyToQueue(getDeadLetterAddress());
assertNotNull(dlqView);
Wait.assertEquals(1, dlqView::getMessageCount);
client = createAmqpClient();
connection = addConnection(client.connect());
session = connection.createSession();
@ -108,10 +111,11 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpReceiver receiverDLQ = session.createReceiver(getDeadLetterAddress());
receiverDLQ.flow(1);
received = receiverDLQ.receive(5, TimeUnit.SECONDS);
Assert.assertEquals(1, received.getTimeToLive());
System.out.println("received.heandler.TTL" + received.getTimeToLive());
Assert.assertNotNull(received);
Assert.assertEquals("Value1", received.getApplicationProperty("key1"));
assertNotNull("Should have read message from DLQ", received);
assertEquals(0, received.getTimeToLive());
assertNotNull(received);
assertEquals("Value1", received.getApplicationProperty("key1"));
connection.close();
}

View File

@ -29,11 +29,16 @@ import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFa
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
import io.netty.buffer.Unpooled;
public class MessageJournalTest extends ActiveMQTestBase {
@Test
@ -78,10 +83,8 @@ public class MessageJournalTest extends ActiveMQTestBase {
} finally {
journalStorageManager.getMessageJournal().stop();
}
}
@Test
public void testStoreAMQP() throws Throwable {
ActiveMQServer server = createServer(true);
@ -90,9 +93,9 @@ public class MessageJournalTest extends ActiveMQTestBase {
ProtonProtocolManagerFactory factory = (ProtonProtocolManagerFactory) server.getRemotingService().getProtocolFactoryMap().get("AMQP");
Message protonJMessage = Message.Factory.create();
MessageImpl protonJMessage = (MessageImpl) Message.Factory.create();
AMQPMessage message = new AMQPMessage(protonJMessage);
AMQPMessage message = encodeAndCreateAMQPMessage(protonJMessage);
message.setMessageID(333);
@ -117,14 +120,19 @@ public class MessageJournalTest extends ActiveMQTestBase {
try {
journalStorageManager.getMessageJournal().start();
journalStorageManager.getMessageJournal().load(committedRecords, preparedTransactions, transactionFailure);
Assert.assertEquals(1, committedRecords.size());
} finally {
journalStorageManager.getMessageJournal().stop();
}
}
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
message.encode(encoded);
NettyReadable readable = new NettyReadable(encoded.getByteBuf());
return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
}
}