From 448f72738b1b7e3f4bc0a757e3905e9c73fb2336 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 13 Aug 2019 11:48:13 -0400 Subject: [PATCH] ARTEMIS-2437 Allow extended types in annotations in AMQP to Core When converting from AMQP to core and back again support annotations that aren't able to be placed into Core message properties by storing the bytes from encoding the types to AMQP encodings and then decoding them again when converting back into AMQP messages. Requires update to proton-j 0.33.2 for encoding fix --- .../amqp/converter/AMQPMessageSupport.java | 4 + .../amqp/converter/AmqpCoreConverter.java | 37 +++- .../amqp/converter/CoreAmqpConverter.java | 44 ++++- .../amqp/converter/TestConversions.java | 178 +++++++++++++++++- pom.xml | 2 +- .../amqp/AmqpExpiredMessageTest.java | 4 +- .../amqp/AmqpLargeMessageTest.java | 104 ++++++++++ 7 files changed, 364 insertions(+), 9 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java index 5f739504e8..21116a6c65 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java @@ -155,6 +155,7 @@ public final class AMQPMessageSupport { public static final String DELIVERY_ANNOTATION_PREFIX = "DA_"; public static final String MESSAGE_ANNOTATION_PREFIX = "MA_"; public static final String FOOTER_PREFIX = "FT_"; + public static final String ENCODED_PREFIX = "ENCODED_"; public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER; public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE; @@ -168,6 +169,9 @@ public final class AMQPMessageSupport { public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX; public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX; public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX; + public static final String JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + ENCODED_PREFIX + DELIVERY_ANNOTATION_PREFIX; + public static final String JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + ENCODED_PREFIX + MESSAGE_ANNOTATION_PREFIX; + public static final String JMS_AMQP_ENCODED_FOOTER_PREFIX = JMS_AMQP_PREFIX + ENCODED_PREFIX + FOOTER_PREFIX; public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING; // Message body type definitions diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index 739d4373da..3c21e08fae 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -28,6 +28,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_STRING; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER; @@ -60,6 +62,7 @@ import java.util.UUID; import javax.jms.DeliveryMode; import javax.jms.JMSException; +import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; @@ -88,6 +91,7 @@ import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.EncoderImpl; import org.apache.qpid.proton.codec.WritableBuffer; import io.netty.buffer.ByteBuf; @@ -280,7 +284,11 @@ public class AmqpCoreConverter { } } - setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue()); + try { + setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue()); + } catch (ActiveMQPropertyConversionException e) { + encodeUnsupportedMessagePropertyType(jms, JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue()); + } } } @@ -403,15 +411,38 @@ public class AmqpCoreConverter { @SuppressWarnings("unchecked") private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer footer) throws Exception { if (footer != null && footer.getValue() != null) { - for (Map.Entry entry : (Set>) footer.getValue().entrySet()) { + for (Map.Entry entry : (Set>) footer.getValue().entrySet()) { String key = entry.getKey().toString(); - setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue()); + try { + setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue()); + } catch (ActiveMQPropertyConversionException e) { + encodeUnsupportedMessagePropertyType(jms, JMS_AMQP_ENCODED_FOOTER_PREFIX + key, entry.getValue()); + } } } return jms; } + private static void encodeUnsupportedMessagePropertyType(ServerJMSMessage jms, String key, Object value) throws JMSException { + final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(); + final EncoderImpl encoder = TLSEncode.getEncoder(); + + try { + encoder.setByteBuffer(new NettyWritable(buffer)); + encoder.writeObject(value); + + final byte[] encodedBytes = new byte[buffer.writerIndex()]; + + buffer.readBytes(encodedBytes); + + setProperty(jms, key, encodedBytes); + } finally { + encoder.setByteBuffer((WritableBuffer) null); + buffer.release(); + } + } + private static void setProperty(javax.jms.Message msg, String key, Object value) throws JMSException { if (value instanceof UnsignedLong) { long v = ((UnsignedLong) value).longValue(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java index 453b7ec41c..5c6a8f3455 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -31,6 +31,9 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER; @@ -91,7 +94,9 @@ import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader; import org.apache.qpid.proton.codec.WritableBuffer; import org.jboss.logging.Logger; @@ -131,7 +136,7 @@ public class CoreAmqpConverter { Map daMap = null; final Map maMap = new HashMap<>(); Map apMap = null; - Map footerMap = null; + Map footerMap = null; Section body = convertBody(message, maMap, properties); @@ -261,10 +266,21 @@ public class CoreAmqpConverter { String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length()); daMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); continue; + } else if (key.startsWith(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX)) { + if (daMap == null) { + daMap = new HashMap<>(); + } + String name = key.substring(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX.length()); + daMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key))); + continue; } else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) { String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length()); maMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); continue; + } else if (key.startsWith(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX)) { + String name = key.substring(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX.length()); + maMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key))); + continue; } else if (key.equals(JMS_AMQP_CONTENT_TYPE)) { properties.setContentType(Symbol.getSymbol(message.getStringProperty(key))); continue; @@ -277,12 +293,19 @@ public class CoreAmqpConverter { } else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) { // skip..remove annotation from previous inbound transformation continue; + } else if (key.startsWith(JMS_AMQP_ENCODED_FOOTER_PREFIX)) { + if (footerMap == null) { + footerMap = new HashMap<>(); + } + String name = key.substring(JMS_AMQP_ENCODED_FOOTER_PREFIX.length()); + footerMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key))); + continue; } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) { if (footerMap == null) { footerMap = new HashMap<>(); } String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length()); - footerMap.put(name, message.getObjectProperty(key)); + footerMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); continue; } } else if (key.equals(Message.HDR_GROUP_ID.toString())) { @@ -351,6 +374,23 @@ public class CoreAmqpConverter { } } + private static Object decodeEmbeddedAMQPType(Object payload) { + final byte[] encodedType = (byte[]) payload; + + final DecoderImpl decoder = TLSEncode.getDecoder(); + Object decodedType = null; + + decoder.setBuffer(ByteBufferReader.wrap(encodedType)); + + try { + decodedType = decoder.readObject(); + } finally { + decoder.setBuffer(null); + } + + return decodedType; + } + private static Section convertBody(ServerJMSMessage message, Map maMap, Properties properties) throws JMSException { Section body = null; diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java index d3976a9b38..37b1103b4d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java @@ -17,14 +17,22 @@ package org.apache.activemq.artemis.protocol.amqp.converter; import java.nio.ByteBuffer; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; + import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; @@ -33,19 +41,25 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMe import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Footer; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; import org.jboss.logging.Logger; import org.junit.Assert; import org.junit.Test; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -232,6 +246,157 @@ public class TestConversions extends Assert { assertEquals(text, textMessage.getText()); } + @SuppressWarnings("unchecked") + @Test + public void testConvertMessageWithMapInMessageAnnotations() throws Exception { + Map mapprop = createPropertiesMap(); + ApplicationProperties properties = new ApplicationProperties(mapprop); + MessageImpl message = (MessageImpl) Message.Factory.create(); + message.setApplicationProperties(properties); + + final String annotationName = "x-opt-test-annotation"; + final Symbol annotationNameSymbol = Symbol.valueOf(annotationName); + + Map embeddedMap = new LinkedHashMap<>(); + embeddedMap.put("key1", "value1"); + embeddedMap.put("key2", "value2"); + embeddedMap.put("key3", "value3"); + Map annotationsMap = new LinkedHashMap<>(); + annotationsMap.put(annotationNameSymbol, embeddedMap); + MessageAnnotations messageAnnotations = new MessageAnnotations(annotationsMap); + byte[] encodedEmbeddedMap = encodeObject(embeddedMap); + + Map mapValues = new HashMap<>(); + mapValues.put("somestr", "value"); + mapValues.put("someint", Integer.valueOf(1)); + + message.setMessageAnnotations(messageAnnotations); + message.setBody(new AmqpValue(mapValues)); + + AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message); + + ICoreMessage serverMessage = encodedMessage.toCore(); + serverMessage.getReadOnlyBodyBuffer(); + + ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) ServerJMSMessage.wrapCoreMessage(serverMessage); + mapMessage.decode(); + + verifyProperties(mapMessage); + + assertEquals(1, mapMessage.getInt("someint")); + assertEquals("value", mapMessage.getString("somestr")); + assertTrue(mapMessage.propertyExists(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + annotationName)); + assertArrayEquals(encodedEmbeddedMap, (byte[]) mapMessage.getObjectProperty(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + annotationName)); + + AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage()); + assertNotNull(newAMQP.getBody()); + assertNotNull(newAMQP.getMessageAnnotations()); + assertNotNull(newAMQP.getMessageAnnotations().getValue()); + assertTrue(newAMQP.getMessageAnnotations().getValue().containsKey(annotationNameSymbol)); + Object result = newAMQP.getMessageAnnotations().getValue().get(annotationNameSymbol); + assertTrue(result instanceof Map); + assertEquals(embeddedMap, (Map) result); + } + + @SuppressWarnings("unchecked") + @Test + public void testConvertMessageWithMapInFooter() throws Exception { + Map mapprop = createPropertiesMap(); + ApplicationProperties properties = new ApplicationProperties(mapprop); + MessageImpl message = (MessageImpl) Message.Factory.create(); + message.setApplicationProperties(properties); + + final String footerName = "test-footer"; + final Symbol footerNameSymbol = Symbol.valueOf(footerName); + + Map embeddedMap = new LinkedHashMap<>(); + embeddedMap.put("key1", "value1"); + embeddedMap.put("key2", "value2"); + embeddedMap.put("key3", "value3"); + Map footerMap = new LinkedHashMap<>(); + footerMap.put(footerNameSymbol, embeddedMap); + Footer messageFooter = new Footer(footerMap); + byte[] encodedEmbeddedMap = encodeObject(embeddedMap); + + Map mapValues = new HashMap<>(); + mapValues.put("somestr", "value"); + mapValues.put("someint", Integer.valueOf(1)); + + message.setFooter(messageFooter); + message.setBody(new AmqpValue(mapValues)); + + AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message); + + ICoreMessage serverMessage = encodedMessage.toCore(); + serverMessage.getReadOnlyBodyBuffer(); + + ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) ServerJMSMessage.wrapCoreMessage(serverMessage); + mapMessage.decode(); + + verifyProperties(mapMessage); + + assertEquals(1, mapMessage.getInt("someint")); + assertEquals("value", mapMessage.getString("somestr")); + assertTrue(mapMessage.propertyExists(JMS_AMQP_ENCODED_FOOTER_PREFIX + footerName)); + assertArrayEquals(encodedEmbeddedMap, (byte[]) mapMessage.getObjectProperty(JMS_AMQP_ENCODED_FOOTER_PREFIX + footerName)); + + AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage()); + assertNotNull(newAMQP.getBody()); + assertNotNull(newAMQP.getFooter()); + assertNotNull(newAMQP.getFooter().getValue()); + assertTrue(newAMQP.getFooter().getValue().containsKey(footerNameSymbol)); + Object result = newAMQP.getFooter().getValue().get(footerNameSymbol); + assertTrue(result instanceof Map); + assertEquals(embeddedMap, (Map) result); + } + + @SuppressWarnings("unchecked") + @Test + public void testConvertFromCoreWithEncodedDeliveryAnnotationProperty() throws Exception { + + final String annotationName = "x-opt-test-annotation"; + final Symbol annotationNameSymbol = Symbol.valueOf(annotationName); + + Map embeddedMap = new LinkedHashMap<>(); + embeddedMap.put("key1", "value1"); + embeddedMap.put("key2", "value2"); + embeddedMap.put("key3", "value3"); + + byte[] encodedEmbeddedMap = encodeObject(embeddedMap); + + ServerJMSMessage serverMessage = createMessage(); + + serverMessage.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL); + serverMessage.setObjectProperty(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX + annotationName, encodedEmbeddedMap); + serverMessage.encode(); + + AMQPMessage newAMQP = CoreAmqpConverter.fromCore(serverMessage.getInnerMessage()); + assertNull(newAMQP.getBody()); + assertNotNull(newAMQP.getDeliveryAnnotations()); + assertNotNull(newAMQP.getDeliveryAnnotations().getValue()); + assertTrue(newAMQP.getDeliveryAnnotations().getValue().containsKey(annotationNameSymbol)); + Object result = newAMQP.getDeliveryAnnotations().getValue().get(annotationNameSymbol); + assertTrue(result instanceof Map); + assertEquals(embeddedMap, (Map) result); + } + + private byte[] encodeObject(Object toEncode) { + ByteBuf scratch = Unpooled.buffer(); + EncoderImpl encoder = TLSEncode.getEncoder(); + encoder.setByteBuffer(new NettyWritable(scratch)); + + try { + encoder.writeObject(toEncode); + } finally { + encoder.setByteBuffer((WritableBuffer) null); + } + + byte[] result = new byte[scratch.writerIndex()]; + scratch.readBytes(result); + + return result; + } + @Test public void testEditAndConvert() throws Exception { @@ -323,4 +488,15 @@ public class TestConversions extends Assert { return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null); } + + private ServerJMSMessage createMessage() { + return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE)); + } + + private CoreMessage newMessage(byte messageType) { + CoreMessage message = new CoreMessage(0, 512); + message.setType(messageType); + ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); + return message; + } } diff --git a/pom.xml b/pom.xml index 23570dfd2a..67ed992b5d 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ 2.25.0 4.1.34.Final 2.0.22.Final - 0.33.1 + 0.33.2 3.0.19.Final 1.7.21 0.43.0 diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java index cfcbc209cb..759a854f1b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java @@ -171,7 +171,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { sender.send(message); sender.close(); - assertEquals(1, queueView.getMessageCount()); + Wait.assertEquals(1, queueView::getMessageCount); // Now try and get the message AmqpReceiver receiver = session.createReceiver(getQueueName()); @@ -204,7 +204,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { sender.send(message); sender.close(); - assertEquals(1, queueView.getMessageCount()); + Wait.assertEquals(1, queueView::getMessageCount); Thread.sleep(1000); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java index 15bfb21148..10ccc95be5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -51,6 +52,7 @@ import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Data; @@ -115,6 +117,52 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { } } + @Test(timeout = 60000) + public void testSendAMQPMessageWithComplexAnnotationsReceiveCore() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + connection.connect(); + + String annotation = "x-opt-embedded-map"; + Map embeddedMap = new LinkedHashMap<>(); + embeddedMap.put("test-key-1", "value-1"); + embeddedMap.put("test-key-2", "value-2"); + embeddedMap.put("test-key-3", "value-3"); + + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(testQueueName); + AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD); + + message.setApplicationProperty("IntProperty", (Integer) 42); + message.setDurable(true); + message.setMessageAnnotation(annotation, embeddedMap); + sender.send(message); + + session.close(); + + Wait.assertEquals(1, () -> getMessageCount(server.getPostOffice(), testQueueName)); + + try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + Connection connection2 = factory.createConnection()) { + + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection2.start(); + MessageConsumer consumer = session2.createConsumer(session2.createQueue(testQueueName)); + + Message received = consumer.receive(5000); + Assert.assertNotNull(received); + Assert.assertEquals(42, received.getIntProperty("IntProperty")); + + connection2.close(); + } + } finally { + connection.close(); + } + } + @Test(timeout = 60000) public void testSendAMQPReceiveOpenWire() throws Exception { server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); @@ -205,6 +253,62 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport { } } + @Test(timeout = 60000) + public void testSendAMQPMessageWithComplexAnnotationsReceiveAMQP() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + String testQueueName = "ConnectionFrameSize"; + int nMsgs = 200; + + AmqpClient client = createAmqpClient(); + + Symbol annotation = Symbol.valueOf("x-opt-embedded-map"); + Map embeddedMap = new LinkedHashMap<>(); + embeddedMap.put("test-key-1", "value-1"); + embeddedMap.put("test-key-2", "value-2"); + embeddedMap.put("test-key-3", "value-3"); + + { + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(testQueueName); + AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD); + + message.setApplicationProperty("IntProperty", (Integer) 42); + message.setDurable(true); + message.setMessageAnnotation(annotation.toString(), embeddedMap); + sender.send(message); + session.close(); + connection.close(); + } + + Wait.assertEquals(1, () -> getMessageCount(server.getPostOffice(), testQueueName)); + + { + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(testQueueName); + receiver.flow(nMsgs); + + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("Failed to read message with embedded map in annotations", message); + MessageImpl wrapped = (MessageImpl) message.getWrappedMessage(); + if (wrapped.getBody() instanceof Data) { + Data data = (Data) wrapped.getBody(); + System.out.println("received : message: " + data.getValue().getLength()); + assertEquals(PAYLOAD, data.getValue().getLength()); + } + + assertNotNull(message.getWrappedMessage().getMessageAnnotations()); + assertNotNull(message.getWrappedMessage().getMessageAnnotations().getValue()); + assertEquals(embeddedMap, message.getWrappedMessage().getMessageAnnotations().getValue().get(annotation)); + + message.accept(); + session.close(); + connection.close(); + } + } + @Test(timeout = 60000) public void testSendAMQPReceiveAMQPViaJMSObjectMessage() throws Exception { server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));