diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java index 4f468ba2a1..86151a1aa8 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java @@ -23,7 +23,9 @@ import java.io.Serializable; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.zip.InflaterInputStream; import javax.jms.JMSException; @@ -313,13 +315,19 @@ public final class AmqpMessageSupport { * @throws JMSException if an error occurs in constructing or fetching the Map. */ public static Map getMapFromMessageBody(ActiveMQMapMessage message) throws JMSException { - final HashMap map = new HashMap(); + final HashMap map = new LinkedHashMap(); final Map contentMap = message.getContentMap(); if (contentMap != null) { - map.putAll(contentMap); + for (Entry entry : contentMap.entrySet()) { + Object value = entry.getValue(); + if (value instanceof byte[]) { + value = new Binary((byte[]) value); + } + map.put(entry.getKey(), value); + } } - return contentMap; + return map; } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java index 79e4c2cbaf..e121cecc28 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java @@ -36,6 +36,7 @@ import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -202,7 +203,12 @@ public class JMSMappingInboundTransformer extends InboundTransformer { ActiveMQMapMessage message = new ActiveMQMapMessage(); final Set> set = content.entrySet(); for (Map.Entry entry : set) { - message.setObject(entry.getKey(), entry.getValue()); + Object value = entry.getValue(); + if (value instanceof Binary) { + Binary binary = (Binary) value; + value = Arrays.copyOfRange(binary.getArray(), binary.getArrayOffset(), binary.getLength()); + } + message.setObject(entry.getKey(), value); } return message; } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java index fa61e1423e..e3d3b174ec 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java @@ -34,6 +34,7 @@ import java.util.UUID; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -42,6 +43,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; +import org.apache.qpid.proton.amqp.Binary; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -253,7 +255,73 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport { @SuppressWarnings("unchecked") @Test - public void testMapMessageSendReceive() throws Exception { + public void testMapMessageUsingPrimitiveSettersSendReceive() throws Exception { + Connection openwire = createJMSConnection(); + Connection amqp = createConnection(); + + openwire.start(); + amqp.start(); + + Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination queue = openwireSession.createQueue(getDestinationName()); + + MessageProducer openwireProducer = openwireSession.createProducer(queue); + MessageConsumer amqpConsumer = amqpSession.createConsumer(queue); + + byte[] bytesValue = new byte[] { 1, 2, 3, 4, 5 }; + + // Create the Message + MapMessage outgoing = openwireSession.createMapMessage(); + + outgoing.setBoolean("boolean", true); + outgoing.setByte("byte", (byte) 10); + outgoing.setBytes("bytes", bytesValue); + outgoing.setChar("char", 'B'); + outgoing.setDouble("double", 24.42); + outgoing.setFloat("float", 3.14159f); + outgoing.setInt("integer", 1024); + outgoing.setLong("long", 8096l); + outgoing.setShort("short", (short) 255); + + openwireProducer.send(outgoing); + + // Now consume the MapMessage + Message received = amqpConsumer.receive(2000); + assertNotNull(received); + assertTrue("Expected MapMessage but got " + received, received instanceof ObjectMessage); + ObjectMessage incoming = (ObjectMessage) received; + + Map incomingMap = (Map) incoming.getObject(); + + assertEquals(true, incomingMap.get("boolean")); + assertEquals(10, (byte) incomingMap.get("byte")); + assertEquals('B', incomingMap.get("char")); + assertEquals(24.42, (double) incomingMap.get("double"), 0.5); + assertEquals(3.14159f, (float) incomingMap.get("float"), 0.5f); + assertEquals(1024, incomingMap.get("integer")); + assertEquals(8096l, incomingMap.get("long")); + assertEquals(255, (short) incomingMap.get("short")); + + // Test for the byte array which will be in an AMQP Binary as this message + // is received as an ObjectMessage by Qpid JMS + Object incomingValue = incomingMap.get("bytes"); + assertNotNull(incomingValue); + assertTrue(incomingValue instanceof Binary); + Binary incomingBinary = (Binary) incomingValue; + byte[] incomingBytes = Arrays.copyOfRange(incomingBinary.getArray(), incomingBinary.getArrayOffset(), incomingBinary.getLength()); + assertTrue(Arrays.equals(bytesValue, incomingBytes)); + + amqp.close(); + openwire.close(); + } + + //----- Tests for OpenWire <-> Qpid JMS using ObjectMessage --------------// + + @SuppressWarnings("unchecked") + @Test + public void testMapInObjectMessageSendReceive() throws Exception { Connection openwire = createJMSConnection(); Connection amqp = createConnection(); @@ -284,7 +352,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport { openwireProducer.send(outgoing); - // Now consumer the ObjectMessage + // Now consume the ObjectMessage Message received = amqpConsumer.receive(2000); assertNotNull(received); assertTrue("Expected ObjectMessage but got " + received, received instanceof ObjectMessage); @@ -300,8 +368,6 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport { openwire.close(); } - //----- Tests for OpenWire <-> Qpid JMS using ObjectMessage --------------// - @Test public void testQpidToOpenWireObjectMessage() throws Exception { @@ -327,7 +393,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport { outgoing.setObject(UUID.randomUUID()); amqpProducer.send(outgoing); - // Now consumer the ObjectMessage + // Now consume the ObjectMessage Message received = openwireConsumer.receive(2000); assertNotNull(received); LOG.info("Read new message: {}", received); @@ -366,7 +432,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport { outgoing.setObject(UUID.randomUUID()); openwireProducer.send(outgoing); - // Now consumer the ObjectMessage + // Now consume the ObjectMessage Message received = amqpConsumer.receive(2000); assertNotNull(received); LOG.info("Read new message: {}", received); @@ -407,7 +473,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport { outgoing.setObject(UUID.randomUUID()); openwireProducer.send(outgoing); - // Now consumer the ObjectMessage + // Now consume the ObjectMessage Message received = amqpConsumer.receive(2000); assertNotNull(received); LOG.info("Read new message: {}", received); @@ -454,7 +520,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport { openwireProducer.send(outgoing); - // Now consumer the ObjectMessage + // Now consume the ObjectMessage Message received = amqpConsumer.receive(2000); assertNotNull(received); assertTrue(received instanceof ObjectMessage); @@ -499,7 +565,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport { amqpProducer.send(outgoing); } - // Now consumer the message + // Now consume the message for (int i = 0; i < NUM_MESSAGES; ++i) { Message received = amqpConsumer.receive(2000); assertNotNull(received); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java index 1427b5a520..6ac080a899 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java @@ -24,12 +24,14 @@ import static org.junit.Assert.assertTrue; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import javax.jms.Destination; +import javax.jms.MapMessage; import javax.jms.Queue; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; @@ -436,6 +438,39 @@ public class JMSMappingInboundTransformerTest { assertEquals("Unexpected message class type", ActiveMQMapMessage.class, jmsMessage.getClass()); } + /** + * Test that an amqp-value body containing a map that has an AMQP Binary as one of the + * entries encoded into the Map results in an MapMessage where a byte array can be read + * from the entry. + * + * @throws Exception if an error occurs during the test. + */ + @Test + public void testCreateAmqpMapMessageFromAmqpValueWithMapContainingBinaryEntry() throws Exception { + final String ENTRY_NAME = "bytesEntry"; + + Message message = Proton.message(); + Map map = new HashMap(); + + byte[] inputBytes = new byte[] { 1, 2, 3, 4, 5 }; + map.put(ENTRY_NAME, new Binary(inputBytes)); + + message.setBody(new AmqpValue(map)); + + EncodedMessage em = encodeMessage(message); + + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); + javax.jms.Message jmsMessage = transformer.transform(em); + + assertNotNull("Message should not be null", jmsMessage); + assertEquals("Unexpected message class type", ActiveMQMapMessage.class, jmsMessage.getClass()); + + MapMessage mapMessage = (MapMessage) jmsMessage; + byte[] outputBytes = mapMessage.getBytes(ENTRY_NAME); + assertNotNull(outputBytes); + assertTrue(Arrays.equals(inputBytes, outputBytes)); + } + /** * Test that an amqp-value body containing a list results in an StreamMessage * when not otherwise annotated to indicate the type of JMS message it is. diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java index ee69650487..f0167b7a5f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java @@ -275,6 +275,34 @@ public class JMSMappingOutboundTransformerTest { assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map); } + @Test + public void testConvertMapMessageToAmqpMessageWithByteArrayValueInBody() throws Exception { + final byte[] byteArray = new byte[] { 1, 2, 3, 4, 5 }; + + ActiveMQMapMessage outbound = createMapMessage(); + outbound.setBytes("bytes", byteArray); + outbound.onSend(); + outbound.storeContent(); + + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); + + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map amqpMap = (Map) ((AmqpValue) amqp.getBody()).getValue(); + + assertEquals(1, amqpMap.size()); + Binary readByteArray = (Binary) amqpMap.get("bytes"); + assertNotNull(readByteArray); + } + @Test public void testConvertMapMessageToAmqpMessage() throws Exception { ActiveMQMapMessage outbound = createMapMessage();