This commit is contained in:
Clebert Suconic 2019-08-18 22:23:52 -04:00
commit cfdec52719
7 changed files with 364 additions and 9 deletions

View File

@ -155,6 +155,7 @@ public final class AMQPMessageSupport {
public static final String DELIVERY_ANNOTATION_PREFIX = "DA_";
public static final String MESSAGE_ANNOTATION_PREFIX = "MA_";
public static final String FOOTER_PREFIX = "FT_";
public static final String ENCODED_PREFIX = "ENCODED_";
public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER;
public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE;
@ -168,6 +169,9 @@ public final class AMQPMessageSupport {
public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX;
public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX;
public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX;
public static final String JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + ENCODED_PREFIX + DELIVERY_ANNOTATION_PREFIX;
public static final String JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + ENCODED_PREFIX + MESSAGE_ANNOTATION_PREFIX;
public static final String JMS_AMQP_ENCODED_FOOTER_PREFIX = JMS_AMQP_PREFIX + ENCODED_PREFIX + FOOTER_PREFIX;
public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING;
// Message body type definitions

View File

@ -28,6 +28,8 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_STRING;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
@ -60,6 +62,7 @@ import java.util.UUID;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@ -88,6 +91,7 @@ import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.WritableBuffer;
import io.netty.buffer.ByteBuf;
@ -280,7 +284,11 @@ public class AmqpCoreConverter {
}
}
setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
try {
setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
} catch (ActiveMQPropertyConversionException e) {
encodeUnsupportedMessagePropertyType(jms, JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
}
}
}
@ -403,15 +411,38 @@ public class AmqpCoreConverter {
@SuppressWarnings("unchecked")
private static ServerJMSMessage processFooter(ServerJMSMessage jms, Footer footer) throws Exception {
if (footer != null && footer.getValue() != null) {
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) footer.getValue().entrySet()) {
for (Map.Entry<Symbol, Object> entry : (Set<Map.Entry<Symbol, Object>>) footer.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
try {
setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
} catch (ActiveMQPropertyConversionException e) {
encodeUnsupportedMessagePropertyType(jms, JMS_AMQP_ENCODED_FOOTER_PREFIX + key, entry.getValue());
}
}
}
return jms;
}
private static void encodeUnsupportedMessagePropertyType(ServerJMSMessage jms, String key, Object value) throws JMSException {
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer();
final EncoderImpl encoder = TLSEncode.getEncoder();
try {
encoder.setByteBuffer(new NettyWritable(buffer));
encoder.writeObject(value);
final byte[] encodedBytes = new byte[buffer.writerIndex()];
buffer.readBytes(encodedBytes);
setProperty(jms, key, encodedBytes);
} finally {
encoder.setByteBuffer((WritableBuffer) null);
buffer.release();
}
}
private static void setProperty(javax.jms.Message msg, String key, Object value) throws JMSException {
if (value instanceof UnsignedLong) {
long v = ((UnsignedLong) value).longValue();

View File

@ -31,6 +31,9 @@ import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSup
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
@ -91,7 +94,9 @@ import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.jboss.logging.Logger;
@ -131,7 +136,7 @@ public class CoreAmqpConverter {
Map<Symbol, Object> daMap = null;
final Map<Symbol, Object> maMap = new HashMap<>();
Map<String, Object> apMap = null;
Map<Object, Object> footerMap = null;
Map<Symbol, Object> footerMap = null;
Section body = convertBody(message, maMap, properties);
@ -261,10 +266,21 @@ public class CoreAmqpConverter {
String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
continue;
} else if (key.startsWith(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX)) {
if (daMap == null) {
daMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX.length());
daMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
continue;
} else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) {
String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
continue;
} else if (key.startsWith(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX)) {
String name = key.substring(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX.length());
maMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
continue;
} else if (key.equals(JMS_AMQP_CONTENT_TYPE)) {
properties.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
continue;
@ -277,12 +293,19 @@ public class CoreAmqpConverter {
} else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) {
// skip..remove annotation from previous inbound transformation
continue;
} else if (key.startsWith(JMS_AMQP_ENCODED_FOOTER_PREFIX)) {
if (footerMap == null) {
footerMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_ENCODED_FOOTER_PREFIX.length());
footerMap.put(Symbol.valueOf(name), decodeEmbeddedAMQPType(message.getObjectProperty(key)));
continue;
} else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) {
if (footerMap == null) {
footerMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
footerMap.put(name, message.getObjectProperty(key));
footerMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
continue;
}
} else if (key.equals(Message.HDR_GROUP_ID.toString())) {
@ -351,6 +374,23 @@ public class CoreAmqpConverter {
}
}
private static Object decodeEmbeddedAMQPType(Object payload) {
final byte[] encodedType = (byte[]) payload;
final DecoderImpl decoder = TLSEncode.getDecoder();
Object decodedType = null;
decoder.setBuffer(ByteBufferReader.wrap(encodedType));
try {
decodedType = decoder.readObject();
} finally {
decoder.setBuffer(null);
}
return decodedType;
}
private static Section convertBody(ServerJMSMessage message, Map<Symbol, Object> maMap, Properties properties) throws JMSException {
Section body = null;

View File

@ -17,14 +17,22 @@
package org.apache.activemq.artemis.protocol.amqp.converter;
import java.nio.ByteBuffer;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
@ -33,19 +41,25 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMe
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@ -232,6 +246,157 @@ public class TestConversions extends Assert {
assertEquals(text, textMessage.getText());
}
@SuppressWarnings("unchecked")
@Test
public void testConvertMessageWithMapInMessageAnnotations() throws Exception {
Map<String, Object> mapprop = createPropertiesMap();
ApplicationProperties properties = new ApplicationProperties(mapprop);
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setApplicationProperties(properties);
final String annotationName = "x-opt-test-annotation";
final Symbol annotationNameSymbol = Symbol.valueOf(annotationName);
Map<String, String> embeddedMap = new LinkedHashMap<>();
embeddedMap.put("key1", "value1");
embeddedMap.put("key2", "value2");
embeddedMap.put("key3", "value3");
Map<Symbol, Object> annotationsMap = new LinkedHashMap<>();
annotationsMap.put(annotationNameSymbol, embeddedMap);
MessageAnnotations messageAnnotations = new MessageAnnotations(annotationsMap);
byte[] encodedEmbeddedMap = encodeObject(embeddedMap);
Map<String, Object> mapValues = new HashMap<>();
mapValues.put("somestr", "value");
mapValues.put("someint", Integer.valueOf(1));
message.setMessageAnnotations(messageAnnotations);
message.setBody(new AmqpValue(mapValues));
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
serverMessage.getReadOnlyBodyBuffer();
ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
mapMessage.decode();
verifyProperties(mapMessage);
assertEquals(1, mapMessage.getInt("someint"));
assertEquals("value", mapMessage.getString("somestr"));
assertTrue(mapMessage.propertyExists(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + annotationName));
assertArrayEquals(encodedEmbeddedMap, (byte[]) mapMessage.getObjectProperty(JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + annotationName));
AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage());
assertNotNull(newAMQP.getBody());
assertNotNull(newAMQP.getMessageAnnotations());
assertNotNull(newAMQP.getMessageAnnotations().getValue());
assertTrue(newAMQP.getMessageAnnotations().getValue().containsKey(annotationNameSymbol));
Object result = newAMQP.getMessageAnnotations().getValue().get(annotationNameSymbol);
assertTrue(result instanceof Map);
assertEquals(embeddedMap, (Map<String, String>) result);
}
@SuppressWarnings("unchecked")
@Test
public void testConvertMessageWithMapInFooter() throws Exception {
Map<String, Object> mapprop = createPropertiesMap();
ApplicationProperties properties = new ApplicationProperties(mapprop);
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setApplicationProperties(properties);
final String footerName = "test-footer";
final Symbol footerNameSymbol = Symbol.valueOf(footerName);
Map<String, String> embeddedMap = new LinkedHashMap<>();
embeddedMap.put("key1", "value1");
embeddedMap.put("key2", "value2");
embeddedMap.put("key3", "value3");
Map<Symbol, Object> footerMap = new LinkedHashMap<>();
footerMap.put(footerNameSymbol, embeddedMap);
Footer messageFooter = new Footer(footerMap);
byte[] encodedEmbeddedMap = encodeObject(embeddedMap);
Map<String, Object> mapValues = new HashMap<>();
mapValues.put("somestr", "value");
mapValues.put("someint", Integer.valueOf(1));
message.setFooter(messageFooter);
message.setBody(new AmqpValue(mapValues));
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
ICoreMessage serverMessage = encodedMessage.toCore();
serverMessage.getReadOnlyBodyBuffer();
ServerJMSMapMessage mapMessage = (ServerJMSMapMessage) ServerJMSMessage.wrapCoreMessage(serverMessage);
mapMessage.decode();
verifyProperties(mapMessage);
assertEquals(1, mapMessage.getInt("someint"));
assertEquals("value", mapMessage.getString("somestr"));
assertTrue(mapMessage.propertyExists(JMS_AMQP_ENCODED_FOOTER_PREFIX + footerName));
assertArrayEquals(encodedEmbeddedMap, (byte[]) mapMessage.getObjectProperty(JMS_AMQP_ENCODED_FOOTER_PREFIX + footerName));
AMQPMessage newAMQP = CoreAmqpConverter.fromCore(mapMessage.getInnerMessage());
assertNotNull(newAMQP.getBody());
assertNotNull(newAMQP.getFooter());
assertNotNull(newAMQP.getFooter().getValue());
assertTrue(newAMQP.getFooter().getValue().containsKey(footerNameSymbol));
Object result = newAMQP.getFooter().getValue().get(footerNameSymbol);
assertTrue(result instanceof Map);
assertEquals(embeddedMap, (Map<String, String>) result);
}
@SuppressWarnings("unchecked")
@Test
public void testConvertFromCoreWithEncodedDeliveryAnnotationProperty() throws Exception {
final String annotationName = "x-opt-test-annotation";
final Symbol annotationNameSymbol = Symbol.valueOf(annotationName);
Map<String, String> embeddedMap = new LinkedHashMap<>();
embeddedMap.put("key1", "value1");
embeddedMap.put("key2", "value2");
embeddedMap.put("key3", "value3");
byte[] encodedEmbeddedMap = encodeObject(embeddedMap);
ServerJMSMessage serverMessage = createMessage();
serverMessage.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
serverMessage.setObjectProperty(JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX + annotationName, encodedEmbeddedMap);
serverMessage.encode();
AMQPMessage newAMQP = CoreAmqpConverter.fromCore(serverMessage.getInnerMessage());
assertNull(newAMQP.getBody());
assertNotNull(newAMQP.getDeliveryAnnotations());
assertNotNull(newAMQP.getDeliveryAnnotations().getValue());
assertTrue(newAMQP.getDeliveryAnnotations().getValue().containsKey(annotationNameSymbol));
Object result = newAMQP.getDeliveryAnnotations().getValue().get(annotationNameSymbol);
assertTrue(result instanceof Map);
assertEquals(embeddedMap, (Map<String, String>) result);
}
private byte[] encodeObject(Object toEncode) {
ByteBuf scratch = Unpooled.buffer();
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(new NettyWritable(scratch));
try {
encoder.writeObject(toEncode);
} finally {
encoder.setByteBuffer((WritableBuffer) null);
}
byte[] result = new byte[scratch.writerIndex()];
scratch.readBytes(result);
return result;
}
@Test
public void testEditAndConvert() throws Exception {
@ -323,4 +488,15 @@ public class TestConversions extends Assert {
return new AMQPMessage(AMQPMessage.DEFAULT_MESSAGE_FORMAT, readable, null, null);
}
private ServerJMSMessage createMessage() {
return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE));
}
private CoreMessage newMessage(byte messageType) {
CoreMessage message = new CoreMessage(0, 512);
message.setType(messageType);
((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
return message;
}
}

View File

@ -92,7 +92,7 @@
<mockito.version>2.25.0</mockito.version>
<netty.version>4.1.34.Final</netty.version>
<netty-tcnative-version>2.0.22.Final</netty-tcnative-version>
<proton.version>0.33.1</proton.version>
<proton.version>0.33.2</proton.version>
<resteasy.version>3.0.19.Final</resteasy.version>
<slf4j.version>1.7.21</slf4j.version>
<qpid.jms.version>0.43.0</qpid.jms.version>

View File

@ -171,7 +171,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
sender.send(message);
sender.close();
assertEquals(1, queueView.getMessageCount());
Wait.assertEquals(1, queueView::getMessageCount);
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getQueueName());
@ -204,7 +204,7 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
sender.send(message);
sender.close();
assertEquals(1, queueView.getMessageCount());
Wait.assertEquals(1, queueView::getMessageCount);
Thread.sleep(1000);

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -51,6 +52,7 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
@ -115,6 +117,52 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
}
}
@Test(timeout = 60000)
public void testSendAMQPMessageWithComplexAnnotationsReceiveCore() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
connection.connect();
String annotation = "x-opt-embedded-map";
Map<String, String> embeddedMap = new LinkedHashMap<>();
embeddedMap.put("test-key-1", "value-1");
embeddedMap.put("test-key-2", "value-2");
embeddedMap.put("test-key-3", "value-3");
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(testQueueName);
AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
message.setApplicationProperty("IntProperty", (Integer) 42);
message.setDurable(true);
message.setMessageAnnotation(annotation, embeddedMap);
sender.send(message);
session.close();
Wait.assertEquals(1, () -> getMessageCount(server.getPostOffice(), testQueueName));
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection2 = factory.createConnection()) {
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection2.start();
MessageConsumer consumer = session2.createConsumer(session2.createQueue(testQueueName));
Message received = consumer.receive(5000);
Assert.assertNotNull(received);
Assert.assertEquals(42, received.getIntProperty("IntProperty"));
connection2.close();
}
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendAMQPReceiveOpenWire() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
@ -205,6 +253,62 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
}
}
@Test(timeout = 60000)
public void testSendAMQPMessageWithComplexAnnotationsReceiveAMQP() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
String testQueueName = "ConnectionFrameSize";
int nMsgs = 200;
AmqpClient client = createAmqpClient();
Symbol annotation = Symbol.valueOf("x-opt-embedded-map");
Map<String, String> embeddedMap = new LinkedHashMap<>();
embeddedMap.put("test-key-1", "value-1");
embeddedMap.put("test-key-2", "value-2");
embeddedMap.put("test-key-3", "value-3");
{
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(testQueueName);
AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
message.setApplicationProperty("IntProperty", (Integer) 42);
message.setDurable(true);
message.setMessageAnnotation(annotation.toString(), embeddedMap);
sender.send(message);
session.close();
connection.close();
}
Wait.assertEquals(1, () -> getMessageCount(server.getPostOffice(), testQueueName));
{
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(testQueueName);
receiver.flow(nMsgs);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Failed to read message with embedded map in annotations", message);
MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
if (wrapped.getBody() instanceof Data) {
Data data = (Data) wrapped.getBody();
System.out.println("received : message: " + data.getValue().getLength());
assertEquals(PAYLOAD, data.getValue().getLength());
}
assertNotNull(message.getWrappedMessage().getMessageAnnotations());
assertNotNull(message.getWrappedMessage().getMessageAnnotations().getValue());
assertEquals(embeddedMap, message.getWrappedMessage().getMessageAnnotations().getValue().get(annotation));
message.accept();
session.close();
connection.close();
}
}
@Test(timeout = 60000)
public void testSendAMQPReceiveAMQPViaJMSObjectMessage() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));