This commit is contained in:
Clebert Suconic 2018-09-26 09:19:41 -04:00
commit 7a463f038a
16 changed files with 3581 additions and 1452 deletions

View File

@ -50,7 +50,6 @@ public class AMQPMessagePersister extends MessagePersister {
SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG;
}
/** Sub classes must add the first short as the protocol-id */
@Override
public void encode(ActiveMQBuffer buffer, Message record) {
@ -62,7 +61,6 @@ public class AMQPMessagePersister extends MessagePersister {
record.persist(buffer);
}
@Override
public Message decode(ActiveMQBuffer buffer, Message record) {
long id = buffer.readLong();
@ -76,5 +74,4 @@ public class AMQPMessagePersister extends MessagePersister {
}
return record;
}
}

View File

@ -63,9 +63,22 @@ public final class AMQPMessageSupport {
/**
* Attribute used to mark the Application defined delivery time assigned to the message
*
* @deprecated Use the SCHEDULED_DELIVERY_TIME value as this is not JMS specific and will be removed.
*/
@Deprecated
public static final Symbol JMS_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");
/**
* Attribute used to mark the Application defined delivery time assigned to the message
*/
public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time");
/**
* Attribute used to mark the Application defined delivery time assigned to the message
*/
public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol("x-opt-delivery-delay");
/**
* Attribute used to mark the Application defined delivery time assigned to the message
*/
@ -226,6 +239,24 @@ public final class AMQPMessageSupport {
}
}
/**
* Check whether the content-type given matches the expect value.
*
* @param expected
* content type string to compare against or null if not expected to be set
* @param actual
* the AMQP content type symbol from the Properties section
*
* @return true if content type matches
*/
public static boolean isContentType(String expected, Symbol actual) {
if (expected == null) {
return actual == null;
} else {
return expected.equals(actual != null ? actual.toString() : actual);
}
}
/**
* @param contentType
* the contentType of the received message

View File

@ -98,23 +98,29 @@ import io.netty.buffer.PooledByteBufAllocator;
* */
public class AmqpCoreConverter {
@SuppressWarnings("unchecked")
public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
return message.toCore(coreMessageObjectPools);
}
@SuppressWarnings("unchecked")
public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools, Header header, MessageAnnotations annotations, Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) throws Exception {
final long messageId = message.getMessageID();
final Symbol contentType = properties != null ? properties.getContentType() : null;
final String contentTypeString = contentType != null ? contentType.toString() : null;
Section body = message.getProtonMessage().getBody();
ServerJMSMessage result;
if (body == null) {
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
result = createObjectMessage(message.getMessageID(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) {
result = createBytesMessage(message.getMessageID(), coreMessageObjectPools);
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType) || isContentType(null, contentType)) {
result = createBytesMessage(messageId, coreMessageObjectPools);
} else {
Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
Charset charset = getCharsetForTextualContent(contentTypeString);
if (charset != null) {
result = createTextMessage(message.getMessageID(), coreMessageObjectPools);
result = createTextMessage(messageId, coreMessageObjectPools);
} else {
result = createMessage(message.getMessageID(), coreMessageObjectPools);
result = createMessage(messageId, coreMessageObjectPools);
}
}
@ -122,30 +128,30 @@ public class AmqpCoreConverter {
} else if (body instanceof Data) {
Binary payload = ((Data) body).getValue();
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType)) {
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else {
Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
Charset charset = getCharsetForTextualContent(contentTypeString);
if (StandardCharsets.UTF_8.equals(charset)) {
ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
try {
CharBuffer chars = charset.newDecoder().decode(buf);
result = createTextMessage(message.getMessageID(), String.valueOf(chars), coreMessageObjectPools);
result = createTextMessage(messageId, String.valueOf(chars), coreMessageObjectPools);
} catch (CharacterCodingException e) {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
} else {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
}
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
} else if (body instanceof AmqpSequence) {
AmqpSequence sequence = (AmqpSequence) body;
ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
ServerJMSStreamMessage m = createStreamMessage(messageId, coreMessageObjectPools);
for (Object item : sequence.getValue()) {
m.writeObject(item);
}
@ -155,35 +161,35 @@ public class AmqpCoreConverter {
} else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue();
if (value == null || value instanceof String) {
result = createTextMessage(message.getMessageID(), (String) value, coreMessageObjectPools);
result = createTextMessage(messageId, (String) value, coreMessageObjectPools);
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
} else if (value instanceof Binary) {
Binary payload = (Binary) value;
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
result = createObjectMessage(message.getMessageID(), payload, coreMessageObjectPools);
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, payload, coreMessageObjectPools);
} else {
result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
} else if (value instanceof List) {
ServerJMSStreamMessage m = createStreamMessage(message.getMessageID(), coreMessageObjectPools);
ServerJMSStreamMessage m = createStreamMessage(messageId, coreMessageObjectPools);
for (Object item : (List<Object>) value) {
m.writeObject(item);
}
result = m;
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
} else if (value instanceof Map) {
result = createMapMessage(message.getMessageID(), (Map<String, Object>) value, coreMessageObjectPools);
result = createMapMessage(messageId, (Map<String, Object>) value, coreMessageObjectPools);
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
} else {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
try {
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
TLSEncode.getEncoder().writeObject(body);
result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
result = createBytesMessage(messageId, buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
} finally {
buf.release();
TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
@ -193,30 +199,38 @@ public class AmqpCoreConverter {
throw new RuntimeException("Unexpected body type: " + body.getClass());
}
TypedProperties properties = message.getExtraProperties();
if (properties != null) {
for (SimpleString str : properties.getPropertyNames()) {
if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
continue;
}
result.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
processHeader(result, header);
processMessageAnnotations(result, annotations);
processApplicationProperties(result, applicationProperties);
processProperties(result, properties);
processFooter(result, footer);
processExtraProperties(result, message.getExtraProperties());
// If the JMS expiration has not yet been set...
if (header != null && result.getJMSExpiration() == 0) {
// Then lets try to set it based on the message TTL.
long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
if (header.getTtl() != null) {
ttl = header.getTtl().longValue();
}
if (ttl == 0) {
result.setJMSExpiration(0);
} else {
result.setJMSExpiration(System.currentTimeMillis() + ttl);
}
}
populateMessage(result, message.getProtonMessage());
result.getInnerMessage().setReplyTo(message.getReplyTo());
result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority());
result.getInnerMessage().setAddress(message.getAddressSimpleString());
result.encode();
return result != null ? result.getInnerMessage() : null;
return result.getInnerMessage();
}
@SuppressWarnings("unchecked")
protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
Header header = amqp.getHeader();
protected static ServerJMSMessage processHeader(ServerJMSMessage jms, Header header) throws Exception {
if (header != null) {
jms.setBooleanProperty(JMS_AMQP_HEADER, true);
@ -248,9 +262,12 @@ public class AmqpCoreConverter {
jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
final MessageAnnotations ma = amqp.getMessageAnnotations();
if (ma != null) {
for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
return jms;
}
protected static ServerJMSMessage processMessageAnnotations(ServerJMSMessage jms, MessageAnnotations annotations) throws Exception {
if (annotations != null && annotations.getValue() != null) {
for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
String key = entry.getKey().toString();
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
long deliveryTime = ((Number) entry.getValue()).longValue();
@ -266,14 +283,33 @@ public class AmqpCoreConverter {
}
}
final ApplicationProperties ap = amqp.getApplicationProperties();
if (ap != null) {
for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) ap.getValue().entrySet()) {
return jms;
}
private static ServerJMSMessage processApplicationProperties(ServerJMSMessage jms, ApplicationProperties properties) throws Exception {
if (properties != null && properties.getValue() != null) {
for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) properties.getValue().entrySet()) {
setProperty(jms, entry.getKey(), entry.getValue());
}
}
final Properties properties = amqp.getProperties();
return jms;
}
private static ServerJMSMessage processExtraProperties(ServerJMSMessage jms, TypedProperties properties) {
if (properties != null) {
for (SimpleString str : properties.getPropertyNames()) {
if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
continue;
}
jms.getInnerMessage().putObjectProperty(str, properties.getProperty(str));
}
}
return jms;
}
private static ServerJMSMessage processProperties(ServerJMSMessage jms, Properties properties) throws Exception {
if (properties != null) {
if (properties.getMessageId() != null) {
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
@ -317,24 +353,13 @@ public class AmqpCoreConverter {
}
}
// If the jms expiration has not yet been set...
if (header != null && jms.getJMSExpiration() == 0) {
// Then lets try to set it based on the message ttl.
long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
if (header.getTtl() != null) {
ttl = header.getTtl().longValue();
}
return jms;
}
if (ttl == 0) {
jms.setJMSExpiration(0);
} else {
jms.setJMSExpiration(System.currentTimeMillis() + ttl);
}
}
final Footer fp = amqp.getFooter();
if (fp != null) {
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
@SuppressWarnings("unchecked")
private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer footer) throws Exception {
if (footer != null && footer.getValue() != null) {
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) footer.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
}

View File

@ -117,11 +117,9 @@ public class NettyWritable implements WritableBuffer {
@Override
public void put(ReadableBuffer buffer) {
if (buffer.hasArray()) {
nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset(), buffer.remaining());
nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
} else {
while (buffer.hasRemaining()) {
nettyBuffer.writeByte(buffer.get());
}
nettyBuffer.writeBytes(buffer.byteBuffer());
}
}
}

View File

@ -29,6 +29,8 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessa
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@ -40,6 +42,8 @@ import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
import io.netty.buffer.Unpooled;
public class TestConversions extends Assert {
@ -58,12 +62,11 @@ public class TestConversions extends Assert {
message.setBody(new AmqpValue(new Boolean(true)));
AMQPMessage encodedMessage = new AMQPMessage(message);
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
verifyProperties(ServerJMSMessage.wrapCoreMessage(serverMessage));
}
@Test
@ -81,7 +84,7 @@ public class TestConversions extends Assert {
message.setBody(new Data(new Binary(bodyBytes)));
AMQPMessage encodedMessage = new AMQPMessage(message);
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
@ -96,7 +99,6 @@ public class TestConversions extends Assert {
bytesMessage.readBytes(newBodyBytes);
Assert.assertArrayEquals(bodyBytes, newBodyBytes);
}
private void verifyProperties(javax.jms.Message message) throws Exception {
@ -135,7 +137,7 @@ public class TestConversions extends Assert {
message.setBody(new AmqpValue(mapValues));
AMQPMessage encodedMessage = new AMQPMessage(message);
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
serverMessage.getReadOnlyBodyBuffer();
@ -145,11 +147,11 @@ public class TestConversions extends Assert {
verifyProperties(mapMessage);
Assert.assertEquals(1, mapMessage.getInt("someint"));
Assert.assertEquals("value", mapMessage.getString("somestr"));
assertEquals(1, mapMessage.getInt("someint"));
assertEquals("value", mapMessage.getString("somestr"));
AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage());
System.out.println(newAMQP.getProtonMessage().getBody());
assertNotNull(newAMQP.getBody());
}
@Test
@ -165,7 +167,7 @@ public class TestConversions extends Assert {
message.setBody(new AmqpSequence(objects));
AMQPMessage encodedMessage = new AMQPMessage(message);
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
@ -189,7 +191,7 @@ public class TestConversions extends Assert {
String text = "someText";
message.setBody(new AmqpValue(text));
AMQPMessage encodedMessage = new AMQPMessage(message);
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
@ -198,8 +200,7 @@ public class TestConversions extends Assert {
verifyProperties(textMessage);
Assert.assertEquals(text, textMessage.getText());
assertEquals(text, textMessage.getText());
}
@Test
@ -209,7 +210,7 @@ public class TestConversions extends Assert {
String text = "someText";
message.setBody(new AmqpValue(text));
AMQPMessage encodedMessage = new AMQPMessage(message);
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
TypedProperties extraProperties = createTypedPropertiesMap();
extraProperties.putBytesProperty(new SimpleString("bytesProp"), "value".getBytes());
encodedMessage.setExtraProperties(extraProperties);
@ -222,8 +223,15 @@ public class TestConversions extends Assert {
verifyProperties(textMessage);
assertEquals("value", new String(((byte[]) textMessage.getObjectProperty("bytesProp"))));
Assert.assertEquals(text, textMessage.getText());
assertEquals(text, textMessage.getText());
}
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
message.encode(encoded);
NettyReadable readable = new NettyReadable(encoded.getByteBuf());
return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
}
}

View File

@ -16,12 +16,11 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -30,6 +29,13 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
@ -39,7 +45,8 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
import org.apache.qpid.proton.Proton;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
@ -47,13 +54,11 @@ import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.Unpooled;
public class JMSMappingInboundTransformerTest {
@ -72,10 +77,10 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception {
Message message = Message.Factory.create();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
AMQPMessage messageEncode = new AMQPMessage(message);
AMQPMessage messageEncode = encodeAndCreateAMQPMessage(message);
ICoreMessage coreMessage = messageEncode.toCore();
@ -94,9 +99,9 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception {
Message message = Message.Factory.create();
MessageImpl message = (MessageImpl) Message.Factory.create();
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@ -104,10 +109,10 @@ public class JMSMappingInboundTransformerTest {
@Test
public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception {
Message message = Message.Factory.create();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setContentType("text/plain");
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@ -125,12 +130,13 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromDataWithEmptyBinaryAndContentType() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
AMQPMessage amqp = encodeAndCreateAMQPMessage(message);
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(amqp.toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@ -145,12 +151,12 @@ public class JMSMappingInboundTransformerTest {
* if an error occurs during the test.
*/
public void testCreateBytesMessageFromDataWithUnknownContentType() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType("unknown-content-type");
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@ -165,13 +171,13 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromDataWithEmptyBinaryAndNoContentType() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
assertNull(message.getContentType());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@ -187,12 +193,12 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateObjectMessageFromDataWithContentTypeAndEmptyBinary() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@ -287,12 +293,12 @@ public class JMSMappingInboundTransformerTest {
}
private void doCreateTextMessageFromDataWithContentTypeTestImpl(String contentType, Charset expectedCharset) throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(contentType);
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
if (StandardCharsets.UTF_8.equals(expectedCharset)) {
@ -313,10 +319,10 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateTextMessageFromAmqpValueWithString() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue("content"));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@ -331,10 +337,10 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateTextMessageFromAmqpValueWithNull() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue(null));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSTextMessage.class, jmsMessage.getClass());
@ -350,11 +356,11 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception {
Message message = Message.Factory.create();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue(new Binary(new byte[0])));
message.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSObjectMessage.class, jmsMessage.getClass());
@ -369,11 +375,11 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateAmqpMapMessageFromAmqpValueWithMap() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Map<String, String> map = new HashMap<>();
message.setBody(new AmqpValue(map));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSMapMessage.class, jmsMessage.getClass());
@ -388,11 +394,11 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateAmqpStreamMessageFromAmqpValueWithList() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
List<String> list = new ArrayList<>();
message.setBody(new AmqpValue(list));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
@ -407,11 +413,11 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateAmqpStreamMessageFromAmqpSequence() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
List<String> list = new ArrayList<>();
message.setBody(new AmqpSequence(list));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSStreamMessage.class, jmsMessage.getClass());
@ -426,11 +432,11 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateAmqpBytesMessageFromAmqpValueWithBinary() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
Binary binary = new Binary(new byte[0]);
message.setBody(new AmqpValue(binary));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@ -446,10 +452,10 @@ public class JMSMappingInboundTransformerTest {
*/
@Test
public void testCreateBytesMessageFromAmqpValueWithUncategorisedContent() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue(UUID.randomUUID()));
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ServerJMSBytesMessage.class, jmsMessage.getClass());
@ -458,10 +464,10 @@ public class JMSMappingInboundTransformerTest {
@Test
public void testTransformMessageWithAmqpValueStringCreatesTextMessage() throws Exception {
String contentString = "myTextMessageContent";
Message message = Message.Factory.create();
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue(contentString));
ServerJMSTextMessage jmsMessage = (ServerJMSTextMessage)ServerJMSMessage.wrapCoreMessage(new AMQPMessage(message).toCore());
ServerJMSTextMessage jmsMessage = (ServerJMSTextMessage)ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
jmsMessage.decode();
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
@ -504,17 +510,17 @@ public class JMSMappingInboundTransformerTest {
throws Exception {
String toAddress = "toAddress";
Message amqp = Message.Factory.create();
amqp.setBody(new AmqpValue("myTextMessageContent"));
amqp.setAddress(toAddress);
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue("myTextMessageContent"));
message.setAddress(toAddress);
if (toTypeAnnotationValue != null) {
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf("x-opt-to-type"), toTypeAnnotationValue);
MessageAnnotations ma = new MessageAnnotations(map);
amqp.setMessageAnnotations(ma);
message.setMessageAnnotations(ma);
}
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
}
@ -549,18 +555,26 @@ public class JMSMappingInboundTransformerTest {
throws Exception {
String replyToAddress = "replyToAddress";
Message amqp = Message.Factory.create();
amqp.setBody(new AmqpValue("myTextMessageContent"));
amqp.setReplyTo(replyToAddress);
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setBody(new AmqpValue("myTextMessageContent"));
message.setReplyTo(replyToAddress);
if (replyToTypeAnnotationValue != null) {
Map<Symbol, Object> map = new HashMap<>();
map.put(Symbol.valueOf("x-opt-reply-type"), replyToTypeAnnotationValue);
MessageAnnotations ma = new MessageAnnotations(map);
amqp.setMessageAnnotations(ma);
message.setMessageAnnotations(ma);
}
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(new AMQPMessage(amqp).toCore());
javax.jms.Message jmsMessage = ServerJMSMessage.wrapCoreMessage(encodeAndCreateAMQPMessage(message).toCore());
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
}
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
message.encode(encoded);
NettyReadable readable = new NettyReadable(encoded.getByteBuf());
return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
}
}

View File

@ -44,6 +44,7 @@ import javax.jms.JMSException;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
@ -59,7 +60,6 @@ import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
public class JMSMappingOutboundTransformerTest {
@ -79,7 +79,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSMessage outbound = createMessage();
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNull(amqp.getBody());
}
@ -90,7 +90,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNull(amqp.getBody());
}
@ -104,7 +104,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeBytes(expectedPayload);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -123,7 +123,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -139,7 +139,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeBytes(expectedPayload);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -159,7 +159,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSMapMessage outbound = createMapMessage();
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -174,7 +174,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setBytes("bytes", byteArray);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -196,7 +196,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setBoolean("property-3", true);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -218,7 +218,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeString("test");
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -239,7 +239,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeString("test");
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -260,7 +260,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.writeString("test");
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpSequence);
@ -279,7 +279,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSObjectMessage outbound = createObjectMessage();
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -292,7 +292,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -304,7 +304,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -321,7 +321,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -338,7 +338,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -356,7 +356,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -371,7 +371,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage();
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -384,7 +384,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage(contentString);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -397,7 +397,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage(contentString);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -410,7 +410,7 @@ public class JMSMappingOutboundTransformerTest {
ServerJMSTextMessage outbound = createTextMessage(contentString);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
@ -427,7 +427,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -445,7 +445,7 @@ public class JMSMappingOutboundTransformerTest {
outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
outbound.encode();
Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage());
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
@ -473,7 +473,7 @@ public class JMSMappingOutboundTransformerTest {
textMessage.setText("myTextMessageContent");
textMessage.setJMSDestination(jmsDestination);
Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage());
MessageAnnotations ma = amqp.getMessageAnnotations();
Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
@ -506,7 +506,7 @@ public class JMSMappingOutboundTransformerTest {
textMessage.setText("myTextMessageContent");
textMessage.setJMSReplyTo(jmsReplyTo);
Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage();
AMQPMessage amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage());
MessageAnnotations ma = amqp.getMessageAnnotations();
Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
@ -518,7 +518,7 @@ public class JMSMappingOutboundTransformerTest {
}
if (jmsReplyTo != null) {
assertEquals("Unexpected 'reply-to' address", jmsReplyTo.getAddress(), amqp.getReplyTo());
assertEquals("Unexpected 'reply-to' address", jmsReplyTo.getSimpleAddress(), amqp.getReplyTo());
}
}

View File

@ -21,23 +21,27 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
/**
* Some simple performance tests for the Message Transformers.
*/
@ -56,10 +60,9 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testBodyOnlyMessage() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Proton.message();
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
AMQPMessage encoded = new AMQPMessage(message);
AMQPMessage encoded = encodeAndCreateAMQPMessage(message);
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
@ -81,8 +84,7 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testMessageWithNoPropertiesOrAnnotations() throws Exception {
Message message = Proton.message();
MessageImpl message = (MessageImpl) Proton.message();
message.setAddress("queue://test-queue");
message.setDeliveryCount(1);
@ -90,7 +92,7 @@ public class JMSTransformationSpeedComparisonTest {
message.setContentType("text/plain");
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
AMQPMessage encoded = new AMQPMessage(message);
AMQPMessage encoded = encodeAndCreateAMQPMessage(message);
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
@ -112,8 +114,7 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testTypicalQpidJMSMessage() throws Exception {
AMQPMessage encoded = new AMQPMessage(createTypicalQpidJMSMessage());
AMQPMessage encoded = encodeAndCreateAMQPMessage(createTypicalQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
@ -136,7 +137,7 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testComplexQpidJMSMessage() throws Exception {
AMQPMessage encoded = encode(createComplexQpidJMSMessage());
AMQPMessage encoded = encodeAndCreateAMQPMessage(createComplexQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
@ -159,7 +160,7 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testTypicalQpidJMSMessageInBoundOnly() throws Exception {
AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
AMQPMessage encoded = encodeAndCreateAMQPMessage(createTypicalQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
@ -182,8 +183,7 @@ public class JMSTransformationSpeedComparisonTest {
@Test
public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception {
AMQPMessage encoded = encode(createTypicalQpidJMSMessage());
AMQPMessage encoded = encodeAndCreateAMQPMessage(createTypicalQpidJMSMessage());
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
@ -204,7 +204,7 @@ public class JMSTransformationSpeedComparisonTest {
LOG_RESULTS(totalDuration);
}
private Message createTypicalQpidJMSMessage() {
private MessageImpl createTypicalQpidJMSMessage() {
Map<String, Object> applicationProperties = new HashMap<>();
Map<Symbol, Object> messageAnnotations = new HashMap<>();
@ -215,7 +215,7 @@ public class JMSTransformationSpeedComparisonTest {
messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
Message message = Proton.message();
MessageImpl message = (MessageImpl) Proton.message();
message.setAddress("queue://test-queue");
message.setDeliveryCount(1);
@ -228,7 +228,7 @@ public class JMSTransformationSpeedComparisonTest {
return message;
}
private Message createComplexQpidJMSMessage() {
private MessageImpl createComplexQpidJMSMessage() {
Map<String, Object> applicationProperties = new HashMap<>();
Map<Symbol, Object> messageAnnotations = new HashMap<>();
@ -245,7 +245,7 @@ public class JMSTransformationSpeedComparisonTest {
messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0);
Message message = Proton.message();
MessageImpl message = (MessageImpl) Proton.message();
// Header Values
message.setPriority((short) 9);
@ -272,8 +272,13 @@ public class JMSTransformationSpeedComparisonTest {
return message;
}
private AMQPMessage encode(Message message) {
return new AMQPMessage(message);
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
message.encode(encoded);
NettyReadable readable = new NettyReadable(encoded.getByteBuf());
return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
}
private void encode(AMQPMessage target) {

View File

@ -16,6 +16,11 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@ -23,20 +28,20 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.junit.Before;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import io.netty.buffer.Unpooled;
/**
* Tests some basic encode / decode functionality on the transformers.
@ -46,34 +51,32 @@ public class MessageTransformationTest {
@Rule
public TestName test = new TestName();
@Before
public void setUp() {
}
@Test
public void testBodyOnlyEncodeDecode() throws Exception {
Message incomingMessage = Proton.message();
MessageImpl incomingMessage = (MessageImpl) Proton.message();
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core);
assertNull(outboudMessage.getHeader());
Section body = outboudMessage.getBody();
assertNotNull(body);
assertTrue(body instanceof AmqpValue);
assertTrue(((AmqpValue) body).getValue() instanceof String);
}
@Test
public void testPropertiesButNoHeadersEncodeDecode() throws Exception {
Message incomingMessage = Proton.message();
MessageImpl incomingMessage = (MessageImpl) Proton.message();
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
incomingMessage.setMessageId("ID:SomeQualifier:0:0:1");
ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core);
assertNull(outboudMessage.getHeader());
assertNotNull(outboudMessage.getProperties());
@ -81,20 +84,24 @@ public class MessageTransformationTest {
@Test
public void testHeaderButNoPropertiesEncodeDecode() throws Exception {
Message incomingMessage = Proton.message();
MessageImpl incomingMessage = (MessageImpl) Proton.message();
incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
incomingMessage.setDurable(true);
ICoreMessage core = new AMQPMessage(incomingMessage).toCore();
Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
ICoreMessage core = encodeAndCreateAMQPMessage(incomingMessage).toCore();
AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core);
assertNotNull(outboudMessage.getHeader());
Section body = outboudMessage.getBody();
assertNotNull(body);
assertTrue(body instanceof AmqpValue);
assertTrue(((AmqpValue) body).getValue() instanceof String);
}
@Test
public void testComplexQpidJMSMessageEncodeDecode() throws Exception {
Map<String, Object> applicationProperties = new HashMap<>();
Map<Symbol, Object> messageAnnotations = new HashMap<>();
@ -113,7 +120,7 @@ public class MessageTransformationTest {
messageAnnotations.put(Symbol.valueOf("x-opt-jms-reply-to"), 0);
messageAnnotations.put(Symbol.valueOf("x-opt-delivery-delay"), 2000);
Message message = Proton.message();
MessageImpl message = (MessageImpl) Proton.message();
// Header Values
message.setPriority((short) 9);
@ -137,10 +144,19 @@ public class MessageTransformationTest {
message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
ICoreMessage core = new AMQPMessage(message).toCore();
Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage();
ICoreMessage core = encodeAndCreateAMQPMessage(message).toCore();
AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core);
assertEquals(10, outboudMessage.getApplicationProperties().getValue().size());
assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size());
}
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
message.encode(encoded);
NettyReadable readable = new NettyReadable(encoded.getByteBuf());
return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
}
}

View File

@ -1,438 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.message;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersisterV2;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.EncodingCodes;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class AMQPMessageTest {
@Test
public void testVerySimple() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
Properties properties = new Properties();
properties.setTo("someNiceLocal");
protonMessage.setProperties(properties);
protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
protonMessage.getHeader().setDurable(Boolean.TRUE);
protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap<>()));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(7, decoded.getHeader().getDeliveryCount().intValue());
assertEquals(true, decoded.getHeader().getDurable());
assertEquals("someNiceLocal", decoded.getAddress());
}
@Test
public void testApplicationPropertiesReencode() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
Properties properties = new Properties();
properties.setTo("someNiceLocal");
protonMessage.setProperties(properties);
protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
protonMessage.getHeader().setDurable(Boolean.TRUE);
HashMap<String, Object> map = new HashMap<>();
map.put("key", "string1");
protonMessage.setApplicationProperties(new ApplicationProperties(map));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals("someNiceLocal", decoded.getAddress());
decoded.setAddress("newAddress");
decoded.reencode();
assertEquals(7, decoded.getHeader().getDeliveryCount().intValue());
assertEquals(true, decoded.getHeader().getDurable());
assertEquals("newAddress", decoded.getAddress());
assertEquals("string1", decoded.getObjectProperty("key"));
// validate if the message will be the same after delivery
AMQPMessage newDecoded = encodeDelivery(decoded, 3);
assertEquals(2, decoded.getHeader().getDeliveryCount().intValue());
assertEquals(true, newDecoded.getHeader().getDurable());
assertEquals("newAddress", newDecoded.getAddress());
assertEquals("string1", newDecoded.getObjectProperty("key"));
}
@Test
public void testGetAddressFromMessage() {
final String ADDRESS = "myQueue";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setAddress(ADDRESS);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(ADDRESS, decoded.getAddress());
}
@Test
public void testGetAddressSimpleStringFromMessage() {
final String ADDRESS = "myQueue";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setAddress(ADDRESS);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(ADDRESS, decoded.getAddressSimpleString().toString());
}
@Test
public void testGetAddressFromMessageWithNoValueSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getAddress());
assertNull(decoded.getAddressSimpleString());
}
@Test
public void testIsDurableFromMessage() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setDurable(true);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertTrue(decoded.isDurable());
}
@Test
public void testIsDurableFromMessageWithNoValueSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertFalse(decoded.isDurable());
}
@Test
public void testGetGroupIDFromMessage() {
final String GROUP_ID = "group-1";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setGroupId(GROUP_ID);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(GROUP_ID, decoded.getGroupID().toString());
}
@Test
public void testGetGroupIDFromMessageWithNoGroupId() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getUserID());
}
@Test
public void testGetUserIDFromMessage() {
final String USER_NAME = "foo";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setUserId(USER_NAME.getBytes(StandardCharsets.UTF_8));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(USER_NAME, decoded.getAMQPUserID());
}
@Test
public void testGetUserIDFromMessageWithNoUserID() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getUserID());
}
@Test
public void testGetPriorityFromMessage() {
final short PRIORITY = 7;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setPriority(PRIORITY);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(PRIORITY, decoded.getPriority());
}
@Test
public void testGetPriorityFromMessageWithNoPrioritySet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
}
@Test
public void testGetTimestampFromMessage() {
Date timestamp = new Date(System.currentTimeMillis());
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
Properties properties = new Properties();
properties.setCreationTime(timestamp);
protonMessage.setProperties(properties);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(timestamp.getTime(), decoded.getTimestamp());
}
@Test
public void testGetTimestampFromMessageWithNoCreateTimeSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0L, decoded.getTimestamp());
}
@Test
public void testExtraProperty() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
byte[] original = RandomUtil.randomBytes();
SimpleString name = SimpleString.toSimpleString("myProperty");
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
decoded.setAddress("someAddress");
decoded.setMessageID(33);
decoded.putExtraBytesProperty(name, original);
ICoreMessage coreMessage = decoded.toCore();
Assert.assertEquals(original, coreMessage.getBytesProperty(name));
ActiveMQBuffer buffer = ActiveMQBuffers.pooledBuffer(10 * 1024);
try {
decoded.getPersister().encode(buffer, decoded);
Assert.assertEquals(AMQPMessagePersisterV2.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
AMQPMessage readMessage = (AMQPMessage)decoded.getPersister().decode(buffer, null);
Assert.assertEquals(33, readMessage.getMessageID());
Assert.assertEquals("someAddress", readMessage.getAddress());
Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
} finally {
buffer.release();
}
{
ICoreMessage embeddedMessage = EmbedMessageUtil.embedAsCoreMessage(decoded);
AMQPMessage readMessage = (AMQPMessage) EmbedMessageUtil.extractEmbedded(embeddedMessage);
Assert.assertEquals(33, readMessage.getMessageID());
Assert.assertEquals("someAddress", readMessage.getAddress());
Assert.assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
}
}
private static final UnsignedLong AMQPVALUE_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000077L);
private static final UnsignedLong APPLICATION_PROPERTIES_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000074L);
private static final UnsignedLong DELIVERY_ANNOTATIONS_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000071L);
@Test
public void testPartialDecodeIgnoresDeliveryAnnotationsByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(DELIVERY_ANNOTATIONS_DESCRIPTOR.byteValue());
encodedBytes.writeByte(EncodingCodes.MAP8);
encodedBytes.writeByte(2); // Size
encodedBytes.writeByte(2); // Elements
// Use bad encoding code on underlying type of map key which will fail the decode if run
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPMessage message = null;
try {
message = new AMQPMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
try {
// This should perform the lazy decode of the DeliveryAnnotations portion of the message
message.reencode();
fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
@Test
public void testPartialDecodeIgnoresApplicationPropertiesByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(APPLICATION_PROPERTIES_DESCRIPTOR.byteValue());
// Use bad encoding code on underlying type
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPMessage message = null;
try {
message = new AMQPMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
assertTrue(message.isDurable());
try {
// This should perform the lazy decode of the ApplicationProperties portion of the message
message.getStringProperty("test");
fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
@Test
public void testPartialDecodeIgnoresBodyByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(AMQPVALUE_DESCRIPTOR.byteValue());
// Use bad encoding code on underlying type
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPMessage message = null;
try {
message = new AMQPMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
assertTrue(message.isDurable());
try {
// This will decode the body section if present in order to present it as a Proton Message object
message.getProtonMessage();
fail("Should have thrown an error when attempting to decode the body which is malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {
ByteBuf nettyBuffer = Unpooled.buffer(1500);
message.encode(new NettyWritable(nettyBuffer));
byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
return new AMQPMessage(0, bytes, null);
}
private AMQPMessage encodeDelivery(AMQPMessage message, int deliveryCount) {
ByteBuf nettyBuffer = Unpooled.buffer(1500);
message.sendBuffer(nettyBuffer, deliveryCount);
byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
return new AMQPMessage(0, bytes, null);
}
}

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.junit.Test;
@ -125,6 +126,28 @@ public class NettyWritableTest {
doPutReadableBufferTestImpl(false);
}
@Test
public void testPutReadableBufferWithOffsetAndNonZeroPosition() {
ByteBuf buffer = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(buffer);
ByteBuffer source = ByteBuffer.allocate(20);
source.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
source.position(5);
source.limit(10);
writable.put(source);
assertEquals(5, writable.position());
assertEquals(5, buffer.readableBytes());
byte[] check = new byte[5];
buffer.readBytes(check);
assertTrue(Arrays.equals(new byte[] {5, 6, 7, 8, 9}, check));
}
private void doPutReadableBufferTestImpl(boolean readOnly) {
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put((byte) 1);

View File

@ -27,7 +27,6 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Assert;
import org.junit.Test;
public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
@ -50,7 +49,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
sender.send(message);
sender.close();
assertEquals(1, queueView.getMessageCount());
Wait.assertEquals(1, queueView::getMessageCount);
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getQueueName());
@ -101,6 +100,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
server.stop();
server.start();
final Queue dlqView = getProxyToQueue(getDeadLetterAddress());
assertNotNull(dlqView);
Wait.assertEquals(1, dlqView::getMessageCount);
client = createAmqpClient();
connection = addConnection(client.connect());
session = connection.createSession();
@ -108,10 +111,11 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
AmqpReceiver receiverDLQ = session.createReceiver(getDeadLetterAddress());
receiverDLQ.flow(1);
received = receiverDLQ.receive(5, TimeUnit.SECONDS);
Assert.assertEquals(1, received.getTimeToLive());
System.out.println("received.heandler.TTL" + received.getTimeToLive());
Assert.assertNotNull(received);
Assert.assertEquals("Value1", received.getApplicationProperty("key1"));
assertNotNull("Should have read message from DLQ", received);
assertEquals(0, received.getTimeToLive());
assertNotNull(received);
assertEquals("Value1", received.getApplicationProperty("key1"));
connection.close();
}

View File

@ -29,11 +29,16 @@ import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFa
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
import io.netty.buffer.Unpooled;
public class MessageJournalTest extends ActiveMQTestBase {
@Test
@ -78,10 +83,8 @@ public class MessageJournalTest extends ActiveMQTestBase {
} finally {
journalStorageManager.getMessageJournal().stop();
}
}
@Test
public void testStoreAMQP() throws Throwable {
ActiveMQServer server = createServer(true);
@ -90,9 +93,9 @@ public class MessageJournalTest extends ActiveMQTestBase {
ProtonProtocolManagerFactory factory = (ProtonProtocolManagerFactory) server.getRemotingService().getProtocolFactoryMap().get("AMQP");
Message protonJMessage = Message.Factory.create();
MessageImpl protonJMessage = (MessageImpl) Message.Factory.create();
AMQPMessage message = new AMQPMessage(protonJMessage);
AMQPMessage message = encodeAndCreateAMQPMessage(protonJMessage);
message.setMessageID(333);
@ -117,14 +120,19 @@ public class MessageJournalTest extends ActiveMQTestBase {
try {
journalStorageManager.getMessageJournal().start();
journalStorageManager.getMessageJournal().load(committedRecords, preparedTransactions, transactionFailure);
Assert.assertEquals(1, committedRecords.size());
} finally {
journalStorageManager.getMessageJournal().stop();
}
}
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
message.encode(encoded);
NettyReadable readable = new NettyReadable(encoded.getByteBuf());
return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
}
}