Better interop for MapMessage with Binary value in the entries of the
payload, should convert back and forth the byte to allow Message to be
treated as a MapMessage and not fall back to a BytesMessage encoding.
(cherry picked from commit d88c4e46ec)
This commit is contained in:
Timothy Bish 2016-09-29 11:09:34 -04:00
parent aa32a0f792
commit a6d2a16b4c
5 changed files with 156 additions and 13 deletions

View File

@ -23,7 +23,9 @@ import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.zip.InflaterInputStream;
import javax.jms.JMSException;
@ -313,13 +315,19 @@ public final class AmqpMessageSupport {
* @throws JMSException if an error occurs in constructing or fetching the Map.
*/
public static Map<String, Object> getMapFromMessageBody(ActiveMQMapMessage message) throws JMSException {
final HashMap<String, Object> map = new HashMap<String, Object>();
final HashMap<String, Object> map = new LinkedHashMap<String, Object>();
final Map<String, Object> contentMap = message.getContentMap();
if (contentMap != null) {
map.putAll(contentMap);
for (Entry<String, Object> entry : contentMap.entrySet()) {
Object value = entry.getValue();
if (value instanceof byte[]) {
value = new Binary((byte[]) value);
}
map.put(entry.getKey(), value);
}
}
return contentMap;
return map;
}
}

View File

@ -36,6 +36,7 @@ import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -202,7 +203,12 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
ActiveMQMapMessage message = new ActiveMQMapMessage();
final Set<Map.Entry<String, Object>> set = content.entrySet();
for (Map.Entry<String, Object> entry : set) {
message.setObject(entry.getKey(), entry.getValue());
Object value = entry.getValue();
if (value instanceof Binary) {
Binary binary = (Binary) value;
value = Arrays.copyOfRange(binary.getArray(), binary.getArrayOffset(), binary.getLength());
}
message.setObject(entry.getKey(), value);
}
return message;
}

View File

@ -34,6 +34,7 @@ import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -42,6 +43,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.qpid.proton.amqp.Binary;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -253,7 +255,73 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
@SuppressWarnings("unchecked")
@Test
public void testMapMessageSendReceive() throws Exception {
public void testMapMessageUsingPrimitiveSettersSendReceive() throws Exception {
Connection openwire = createJMSConnection();
Connection amqp = createConnection();
openwire.start();
amqp.start();
Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = openwireSession.createQueue(getDestinationName());
MessageProducer openwireProducer = openwireSession.createProducer(queue);
MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
byte[] bytesValue = new byte[] { 1, 2, 3, 4, 5 };
// Create the Message
MapMessage outgoing = openwireSession.createMapMessage();
outgoing.setBoolean("boolean", true);
outgoing.setByte("byte", (byte) 10);
outgoing.setBytes("bytes", bytesValue);
outgoing.setChar("char", 'B');
outgoing.setDouble("double", 24.42);
outgoing.setFloat("float", 3.14159f);
outgoing.setInt("integer", 1024);
outgoing.setLong("long", 8096l);
outgoing.setShort("short", (short) 255);
openwireProducer.send(outgoing);
// Now consume the MapMessage
Message received = amqpConsumer.receive(2000);
assertNotNull(received);
assertTrue("Expected MapMessage but got " + received, received instanceof ObjectMessage);
ObjectMessage incoming = (ObjectMessage) received;
Map<String, Object> incomingMap = (Map<String, Object>) incoming.getObject();
assertEquals(true, incomingMap.get("boolean"));
assertEquals(10, (byte) incomingMap.get("byte"));
assertEquals('B', incomingMap.get("char"));
assertEquals(24.42, (double) incomingMap.get("double"), 0.5);
assertEquals(3.14159f, (float) incomingMap.get("float"), 0.5f);
assertEquals(1024, incomingMap.get("integer"));
assertEquals(8096l, incomingMap.get("long"));
assertEquals(255, (short) incomingMap.get("short"));
// Test for the byte array which will be in an AMQP Binary as this message
// is received as an ObjectMessage by Qpid JMS
Object incomingValue = incomingMap.get("bytes");
assertNotNull(incomingValue);
assertTrue(incomingValue instanceof Binary);
Binary incomingBinary = (Binary) incomingValue;
byte[] incomingBytes = Arrays.copyOfRange(incomingBinary.getArray(), incomingBinary.getArrayOffset(), incomingBinary.getLength());
assertTrue(Arrays.equals(bytesValue, incomingBytes));
amqp.close();
openwire.close();
}
//----- Tests for OpenWire <-> Qpid JMS using ObjectMessage --------------//
@SuppressWarnings("unchecked")
@Test
public void testMapInObjectMessageSendReceive() throws Exception {
Connection openwire = createJMSConnection();
Connection amqp = createConnection();
@ -284,7 +352,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
openwireProducer.send(outgoing);
// Now consumer the ObjectMessage
// Now consume the ObjectMessage
Message received = amqpConsumer.receive(2000);
assertNotNull(received);
assertTrue("Expected ObjectMessage but got " + received, received instanceof ObjectMessage);
@ -300,8 +368,6 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
openwire.close();
}
//----- Tests for OpenWire <-> Qpid JMS using ObjectMessage --------------//
@Test
public void testQpidToOpenWireObjectMessage() throws Exception {
@ -327,7 +393,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
outgoing.setObject(UUID.randomUUID());
amqpProducer.send(outgoing);
// Now consumer the ObjectMessage
// Now consume the ObjectMessage
Message received = openwireConsumer.receive(2000);
assertNotNull(received);
LOG.info("Read new message: {}", received);
@ -366,7 +432,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
outgoing.setObject(UUID.randomUUID());
openwireProducer.send(outgoing);
// Now consumer the ObjectMessage
// Now consume the ObjectMessage
Message received = amqpConsumer.receive(2000);
assertNotNull(received);
LOG.info("Read new message: {}", received);
@ -407,7 +473,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
outgoing.setObject(UUID.randomUUID());
openwireProducer.send(outgoing);
// Now consumer the ObjectMessage
// Now consume the ObjectMessage
Message received = amqpConsumer.receive(2000);
assertNotNull(received);
LOG.info("Read new message: {}", received);
@ -454,7 +520,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
openwireProducer.send(outgoing);
// Now consumer the ObjectMessage
// Now consume the ObjectMessage
Message received = amqpConsumer.receive(2000);
assertNotNull(received);
assertTrue(received instanceof ObjectMessage);
@ -499,7 +565,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
amqpProducer.send(outgoing);
}
// Now consumer the message
// Now consume the message
for (int i = 0; i < NUM_MESSAGES; ++i) {
Message received = amqpConsumer.receive(2000);
assertNotNull(received);

View File

@ -24,12 +24,14 @@ import static org.junit.Assert.assertTrue;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
@ -436,6 +438,39 @@ public class JMSMappingInboundTransformerTest {
assertEquals("Unexpected message class type", ActiveMQMapMessage.class, jmsMessage.getClass());
}
/**
* Test that an amqp-value body containing a map that has an AMQP Binary as one of the
* entries encoded into the Map results in an MapMessage where a byte array can be read
* from the entry.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateAmqpMapMessageFromAmqpValueWithMapContainingBinaryEntry() throws Exception {
final String ENTRY_NAME = "bytesEntry";
Message message = Proton.message();
Map<String, Object> map = new HashMap<String, Object>();
byte[] inputBytes = new byte[] { 1, 2, 3, 4, 5 };
map.put(ENTRY_NAME, new Binary(inputBytes));
message.setBody(new AmqpValue(map));
EncodedMessage em = encodeMessage(message);
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQMapMessage.class, jmsMessage.getClass());
MapMessage mapMessage = (MapMessage) jmsMessage;
byte[] outputBytes = mapMessage.getBytes(ENTRY_NAME);
assertNotNull(outputBytes);
assertTrue(Arrays.equals(inputBytes, outputBytes));
}
/**
* Test that an amqp-value body containing a list results in an StreamMessage
* when not otherwise annotated to indicate the type of JMS message it is.

View File

@ -275,6 +275,34 @@ public class JMSMappingOutboundTransformerTest {
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
}
@Test
public void testConvertMapMessageToAmqpMessageWithByteArrayValueInBody() throws Exception {
final byte[] byteArray = new byte[] { 1, 2, 3, 4, 5 };
ActiveMQMapMessage outbound = createMapMessage();
outbound.setBytes("bytes", byteArray);
outbound.onSend();
outbound.storeContent();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
EncodedMessage encoded = transformer.transform(outbound);
assertNotNull(encoded);
Message amqp = encoded.decode();
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
@SuppressWarnings("unchecked")
Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
assertEquals(1, amqpMap.size());
Binary readByteArray = (Binary) amqpMap.get("bytes");
assertNotNull(readByteArray);
}
@Test
public void testConvertMapMessageToAmqpMessage() throws Exception {
ActiveMQMapMessage outbound = createMapMessage();