ARTEMIS-2437 Allow extended types in annotations in AMQP to Core

When converting from AMQP to core and back again support annotations that
aren't able to be placed into Core message properties by storing the bytes
from encoding the types to AMQP encodings and then decoding them again
when converting back into AMQP messages.

Requires update to proton-j 0.33.2 for encoding fix
This commit is contained in:
Timothy Bish 2019-08-13 11:48:13 -04:00 committed by Clebert Suconic
parent 60e5bf96c7
commit 448f72738b
7 changed files with 364 additions and 9 deletions

View File

@ -155,6 +155,7 @@ public final class AMQPMessageSupport {
public static final String DELIVERY_ANNOTATION_PREFIX = "DA_";
public static final String MESSAGE_ANNOTATION_PREFIX = "MA_";
public static final String FOOTER_PREFIX = "FT_";
public static final String ENCODED_PREFIX = "ENCODED_";
public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER;
public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE;
@ -168,6 +169,9 @@ public final class AMQPMessageSupport {
public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX;
public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX;
public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX;
public static final String JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + ENCODED_PREFIX + DELIVERY_ANNOTATION_PREFIX;
public static final String JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + ENCODED_PREFIX + MESSAGE_ANNOTATION_PREFIX;
public static final String JMS_AMQP_ENCODED_FOOTER_PREFIX = JMS_AMQP_PREFIX + ENCODED_PREFIX + FOOTER_PREFIX;
public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING;
// Message body type definitions

View File

@ -28,6 +28,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_STRING;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
@ -60,6 +62,7 @@ import java.util.UUID;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@ -88,6 +91,7 @@ import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.WritableBuffer;
import io.netty.buffer.ByteBuf;
@ -280,7 +284,11 @@ public class AmqpCoreConverter {
}
}
setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
try {
setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
} catch (ActiveMQPropertyConversionException e) {
encodeUnsupportedMessagePropertyType(jms, JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
}
}
}
@ -403,15 +411,38 @@ public class AmqpCoreConverter {
@SuppressWarnings("unchecked")
private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer footer) throws Exception {
if (footer != null && footer.getValue() != null) {
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) footer.getValue().entrySet()) {
for (Map.Entry<Symbol, Object> entry : (Set<Map.Entry<Symbol, Object>>) footer.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
try {
setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
} catch (ActiveMQPropertyConversionException e) {
encodeUnsupportedMessagePropertyType(jms, JMS_AMQP_ENCODED_FOOTER_PREFIX + key, entry.getValue());
}
}
}
return jms;
}
private static void encodeUnsupportedMessagePropertyType(ServerJMSMessage jms, String key, Object value) throws JMSException {
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer();
final EncoderImpl encoder = TLSEncode.getEncoder();
try {
encoder.setByteBuffer(new NettyWritable(buffer));
encoder.writeObject(value);
final byte[] encodedBytes = new byte[buffer.writerIndex()];
buffer.readBytes(encodedBytes);
setProperty(jms, key, encodedBytes);
} finally {
encoder.setByteBuffer((WritableBuffer) null);
buffer.release();
}
}
private static void setProperty(javax.jms.Message msg, String key, Object value) throws JMSException {
if (value instanceof UnsignedLong) {
long v = ((UnsignedLong) value).longValue();

View File

@ -31,6 +31,9 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
@ -91,7 +94,9 @@ import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.jboss.logging.Logger;
@ -131,7 +136,7 @@ public class CoreAmqpConverter {
Map<Symbol, Object> daMap = null;
final Map<Symbol, Object> maMap = new HashMap<>();
Map<String, Object> apMap = null;
Map<Object, Object> footerMap = null;
Map<Symbol, Object> footerMap = null;
Section body = convertBody(message, maMap, properties);
@ -261,10 +266,21 @@ public class CoreAmqpConverter {
String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
continue;
} else if (key.startsWith(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX)) {
if (daMap == null) {
daMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX.length());
daMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
continue;
} else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) {
String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
continue;
} else if (key.startsWith(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX)) {
String name = key.substring(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX.length());
maMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
continue;
} else if (key.equals(JMS_AMQP_CONTENT_TYPE)) {
properties.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
continue;
@ -277,12 +293,19 @@ public class CoreAmqpConverter {
} else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) {
// skip..remove annotation from previous inbound transformation
continue;
} else if (key.startsWith(JMS_AMQP_ENCODED_FOOTER_PREFIX)) {
if (footerMap == null) {
footerMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_ENCODED_FOOTER_PREFIX.length());
footerMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
continue;
} else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) {
if (footerMap == null) {
footerMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
footerMap.put(name, message.getObjectProperty(key));
footerMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
continue;
}
} else if (key.equals(Message.HDR_GROUP_ID.toString())) {
@ -351,6 +374,23 @@ public class CoreAmqpConverter {
}
}
private static Object decodeEmbeddedAMQPType(Object payload) {
final byte[] encodedType = (byte[]) payload;
final DecoderImpl decoder = TLSEncode.getDecoder();
Object decodedType = null;
decoder.setBuffer(ByteBufferReader.wrap(encodedType));
try {
decodedType = decoder.readObject();
} finally {
decoder.setBuffer(null);
}
return decodedType;
}
private static Section convertBody(ServerJMSMessage message, Map<Symbol, Object> maMap, Properties properties) throws JMSException {
Section body = null;

View File

@ -17,14 +17,22 @@
package org.apache.activemq.artemis.protocol.amqp.converter;
import java.nio.ByteBuffer;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@ -33,19 +41,25 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMe
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@ -232,6 +246,157 @@ public class TestConversions extends Assert {
assertEquals(text, textMessage.getText());
}
@SuppressWarnings("unchecked")
@Test
public void testConvertMessageWithMapInMessageAnnotations() throws Exception {
Map<String, Object> mapprop = createPropertiesMap();
ApplicationProperties properties = new ApplicationProperties(mapprop);
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setApplicationProperties(properties);
final String annotationName = "x-opt-test-annotation";
final Symbol annotationNameSymbol = Symbol.valueOf(annotationName);
Map<String, String> embeddedMap = new LinkedHashMap<>();
embeddedMap.put("key1", "value1");
embeddedMap.put("key2", "value2");
embeddedMap.put("key3", "value3");
Map<Symbol, Object> annotationsMap = new LinkedHashMap<>();
annotationsMap.put(annotationNameSymbol, embeddedMap);
MessageAnnotations messageAnnotations = new MessageAnnotations(annotationsMap);
byte[] encodedEmbeddedMap = encodeObject(embeddedMap);
Map<String, Object> mapValues = new HashMap<>();
mapValues.put("somestr", "value");
mapValues.put("someint", Integer.valueOf(1));
message.setMessageAnnotations(messageAnnotations);
message.setBody(new AmqpValue(mapValues));
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
serverMessage.getReadOnlyBodyBuffer();
ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
mapMessage.decode();
verifyProperties(mapMessage);
assertEquals(1, mapMessage.getInt("someint"));
assertEquals("value", mapMessage.getString("somestr"));
assertTrue(mapMessage.propertyExists(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + annotationName));
assertArrayEquals(encodedEmbeddedMap, (byte[]) mapMessage.getObjectProperty(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + annotationName));
AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage());
assertNotNull(newAMQP.getBody());
assertNotNull(newAMQP.getMessageAnnotations());
assertNotNull(newAMQP.getMessageAnnotations().getValue());
assertTrue(newAMQP.getMessageAnnotations().getValue().containsKey(annotationNameSymbol));
Object result = newAMQP.getMessageAnnotations().getValue().get(annotationNameSymbol);
assertTrue(result instanceof Map);
assertEquals(embeddedMap, (Map<String, String>) result);
}
@SuppressWarnings("unchecked")
@Test
public void testConvertMessageWithMapInFooter() throws Exception {
Map<String, Object> mapprop = createPropertiesMap();
ApplicationProperties properties = new ApplicationProperties(mapprop);
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setApplicationProperties(properties);
final String footerName = "test-footer";
final Symbol footerNameSymbol = Symbol.valueOf(footerName);
Map<String, String> embeddedMap = new LinkedHashMap<>();
embeddedMap.put("key1", "value1");
embeddedMap.put("key2", "value2");
embeddedMap.put("key3", "value3");
Map<Symbol, Object> footerMap = new LinkedHashMap<>();
footerMap.put(footerNameSymbol, embeddedMap);
Footer messageFooter = new Footer(footerMap);
byte[] encodedEmbeddedMap = encodeObject(embeddedMap);
Map<String, Object> mapValues = new HashMap<>();
mapValues.put("somestr", "value");
mapValues.put("someint", Integer.valueOf(1));
message.setFooter(messageFooter);
message.setBody(new AmqpValue(mapValues));
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
serverMessage.getReadOnlyBodyBuffer();
ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
mapMessage.decode();
verifyProperties(mapMessage);
assertEquals(1, mapMessage.getInt("someint"));
assertEquals("value", mapMessage.getString("somestr"));
assertTrue(mapMessage.propertyExists(JMS_AMQP_ENCODED_FOOTER_PREFIX + footerName));
assertArrayEquals(encodedEmbeddedMap, (byte[]) mapMessage.getObjectProperty(JMS_AMQP_ENCODED_FOOTER_PREFIX + footerName));
AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage());
assertNotNull(newAMQP.getBody());
assertNotNull(newAMQP.getFooter());
assertNotNull(newAMQP.getFooter().getValue());
assertTrue(newAMQP.getFooter().getValue().containsKey(footerNameSymbol));
Object result = newAMQP.getFooter().getValue().get(footerNameSymbol);
assertTrue(result instanceof Map);
assertEquals(embeddedMap, (Map<String, String>) result);
}
@SuppressWarnings("unchecked")
@Test
public void testConvertFromCoreWithEncodedDeliveryAnnotationProperty() throws Exception {
final String annotationName = "x-opt-test-annotation";
final Symbol annotationNameSymbol = Symbol.valueOf(annotationName);
Map<String, String> embeddedMap = new LinkedHashMap<>();
embeddedMap.put("key1", "value1");
embeddedMap.put("key2", "value2");
embeddedMap.put("key3", "value3");
byte[] encodedEmbeddedMap = encodeObject(embeddedMap);
ServerJMSMessage serverMessage = createMessage();
serverMessage.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
serverMessage.setObjectProperty(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX + annotationName, encodedEmbeddedMap);
serverMessage.encode();
AMQPMessage newAMQP = CoreAmqpConverter.fromCore(serverMessage.getInnerMessage());
assertNull(newAMQP.getBody());
assertNotNull(newAMQP.getDeliveryAnnotations());
assertNotNull(newAMQP.getDeliveryAnnotations().getValue());
assertTrue(newAMQP.getDeliveryAnnotations().getValue().containsKey(annotationNameSymbol));
Object result = newAMQP.getDeliveryAnnotations().getValue().get(annotationNameSymbol);
assertTrue(result instanceof Map);
assertEquals(embeddedMap, (Map<String, String>) result);
}
private byte[] encodeObject(Object toEncode) {
ByteBuf scratch = Unpooled.buffer();
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(new NettyWritable(scratch));
try {
encoder.writeObject(toEncode);
} finally {
encoder.setByteBuffer((WritableBuffer) null);
}
byte[] result = new byte[scratch.writerIndex()];
scratch.readBytes(result);
return result;
}
@Test
public void testEditAndConvert() throws Exception {
@ -323,4 +488,15 @@ public class TestConversions extends Assert {
return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
}
private ServerJMSMessage createMessage() {
return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE));
}
private CoreMessage newMessage(byte messageType) {
CoreMessage message = new CoreMessage(0, 512);
message.setType(messageType);
((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
return message;
}
}

View File

@ -92,7 +92,7 @@
<mockito.version>2.25.0</mockito.version>
<netty.version>4.1.34.Final</netty.version>
<netty-tcnative-version>2.0.22.Final</netty-tcnative-version>
<proton.version>0.33.1</proton.version>
<proton.version>0.33.2</proton.version>
<resteasy.version>3.0.19.Final</resteasy.version>
<slf4j.version>1.7.21</slf4j.version>
<qpid.jms.version>0.43.0</qpid.jms.version>

View File

@ -171,7 +171,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
sender.send(message);
sender.close();
assertEquals(1, queueView.getMessageCount());
Wait.assertEquals(1, queueView::getMessageCount);
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getQueueName());
@ -204,7 +204,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
sender.send(message);
sender.close();
assertEquals(1, queueView.getMessageCount());
Wait.assertEquals(1, queueView::getMessageCount);
Thread.sleep(1000);

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -51,6 +52,7 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
@ -115,6 +117,52 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
}
}
@Test(timeout = 60000)
public void testSendAMQPMessageWithComplexAnnotationsReceiveCore() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
connection.connect();
String annotation = "x-opt-embedded-map";
Map<String, String> embeddedMap = new LinkedHashMap<>();
embeddedMap.put("test-key-1", "value-1");
embeddedMap.put("test-key-2", "value-2");
embeddedMap.put("test-key-3", "value-3");
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(testQueueName);
AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
message.setApplicationProperty("IntProperty", (Integer) 42);
message.setDurable(true);
message.setMessageAnnotation(annotation, embeddedMap);
sender.send(message);
session.close();
Wait.assertEquals(1, () -> getMessageCount(server.getPostOffice(), testQueueName));
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection2 = factory.createConnection()) {
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection2.start();
MessageConsumer consumer = session2.createConsumer(session2.createQueue(testQueueName));
Message received = consumer.receive(5000);
Assert.assertNotNull(received);
Assert.assertEquals(42, received.getIntProperty("IntProperty"));
connection2.close();
}
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendAMQPReceiveOpenWire() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
@ -205,6 +253,62 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
}
}
@Test(timeout = 60000)
public void testSendAMQPMessageWithComplexAnnotationsReceiveAMQP() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
String testQueueName = "ConnectionFrameSize";
int nMsgs = 200;
AmqpClient client = createAmqpClient();
Symbol annotation = Symbol.valueOf("x-opt-embedded-map");
Map<String, String> embeddedMap = new LinkedHashMap<>();
embeddedMap.put("test-key-1", "value-1");
embeddedMap.put("test-key-2", "value-2");
embeddedMap.put("test-key-3", "value-3");
{
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(testQueueName);
AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
message.setApplicationProperty("IntProperty", (Integer) 42);
message.setDurable(true);
message.setMessageAnnotation(annotation.toString(), embeddedMap);
sender.send(message);
session.close();
connection.close();
}
Wait.assertEquals(1, () -> getMessageCount(server.getPostOffice(), testQueueName));
{
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(testQueueName);
receiver.flow(nMsgs);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Failed to read message with embedded map in annotations", message);
MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
if (wrapped.getBody() instanceof Data) {
Data data = (Data) wrapped.getBody();
System.out.println("received : message: " + data.getValue().getLength());
assertEquals(PAYLOAD, data.getValue().getLength());
}
assertNotNull(message.getWrappedMessage().getMessageAnnotations());
assertNotNull(message.getWrappedMessage().getMessageAnnotations().getValue());
assertEquals(embeddedMap, message.getWrappedMessage().getMessageAnnotations().getValue().get(annotation));
message.accept();
session.close();
connection.close();
}
}
@Test(timeout = 60000)
public void testSendAMQPReceiveAMQPViaJMSObjectMessage() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));