mirror of https://github.com/apache/activemq.git
Better interop for MapMessage with Binary value in the entries of the payload, should convert back and forth the byte to allow Message to be treated as a MapMessage and not fall back to a BytesMessage encoding.
This commit is contained in:
parent
351faf2699
commit
d88c4e46ec
|
@ -23,7 +23,9 @@ import java.io.Serializable;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.zip.InflaterInputStream;
|
import java.util.zip.InflaterInputStream;
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
@ -313,13 +315,19 @@ public final class AmqpMessageSupport {
|
||||||
* @throws JMSException if an error occurs in constructing or fetching the Map.
|
* @throws JMSException if an error occurs in constructing or fetching the Map.
|
||||||
*/
|
*/
|
||||||
public static Map<String, Object> getMapFromMessageBody(ActiveMQMapMessage message) throws JMSException {
|
public static Map<String, Object> getMapFromMessageBody(ActiveMQMapMessage message) throws JMSException {
|
||||||
final HashMap<String, Object> map = new HashMap<String, Object>();
|
final HashMap<String, Object> map = new LinkedHashMap<String, Object>();
|
||||||
|
|
||||||
final Map<String, Object> contentMap = message.getContentMap();
|
final Map<String, Object> contentMap = message.getContentMap();
|
||||||
if (contentMap != null) {
|
if (contentMap != null) {
|
||||||
map.putAll(contentMap);
|
for (Entry<String, Object> entry : contentMap.entrySet()) {
|
||||||
|
Object value = entry.getValue();
|
||||||
|
if (value instanceof byte[]) {
|
||||||
|
value = new Binary((byte[]) value);
|
||||||
|
}
|
||||||
|
map.put(entry.getKey(), value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return contentMap;
|
return map;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ import java.nio.CharBuffer;
|
||||||
import java.nio.charset.CharacterCodingException;
|
import java.nio.charset.CharacterCodingException;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -202,7 +203,12 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
|
||||||
ActiveMQMapMessage message = new ActiveMQMapMessage();
|
ActiveMQMapMessage message = new ActiveMQMapMessage();
|
||||||
final Set<Map.Entry<String, Object>> set = content.entrySet();
|
final Set<Map.Entry<String, Object>> set = content.entrySet();
|
||||||
for (Map.Entry<String, Object> entry : set) {
|
for (Map.Entry<String, Object> entry : set) {
|
||||||
message.setObject(entry.getKey(), entry.getValue());
|
Object value = entry.getValue();
|
||||||
|
if (value instanceof Binary) {
|
||||||
|
Binary binary = (Binary) value;
|
||||||
|
value = Arrays.copyOfRange(binary.getArray(), binary.getArrayOffset(), binary.getLength());
|
||||||
|
}
|
||||||
|
message.setObject(entry.getKey(), value);
|
||||||
}
|
}
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import java.util.UUID;
|
||||||
import javax.jms.BytesMessage;
|
import javax.jms.BytesMessage;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.MapMessage;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
|
@ -42,6 +43,7 @@ import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.qpid.proton.amqp.Binary;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
@ -253,7 +255,73 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Test
|
@Test
|
||||||
public void testMapMessageSendReceive() throws Exception {
|
public void testMapMessageUsingPrimitiveSettersSendReceive() throws Exception {
|
||||||
|
Connection openwire = createJMSConnection();
|
||||||
|
Connection amqp = createConnection();
|
||||||
|
|
||||||
|
openwire.start();
|
||||||
|
amqp.start();
|
||||||
|
|
||||||
|
Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
Destination queue = openwireSession.createQueue(getDestinationName());
|
||||||
|
|
||||||
|
MessageProducer openwireProducer = openwireSession.createProducer(queue);
|
||||||
|
MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
|
||||||
|
|
||||||
|
byte[] bytesValue = new byte[] { 1, 2, 3, 4, 5 };
|
||||||
|
|
||||||
|
// Create the Message
|
||||||
|
MapMessage outgoing = openwireSession.createMapMessage();
|
||||||
|
|
||||||
|
outgoing.setBoolean("boolean", true);
|
||||||
|
outgoing.setByte("byte", (byte) 10);
|
||||||
|
outgoing.setBytes("bytes", bytesValue);
|
||||||
|
outgoing.setChar("char", 'B');
|
||||||
|
outgoing.setDouble("double", 24.42);
|
||||||
|
outgoing.setFloat("float", 3.14159f);
|
||||||
|
outgoing.setInt("integer", 1024);
|
||||||
|
outgoing.setLong("long", 8096l);
|
||||||
|
outgoing.setShort("short", (short) 255);
|
||||||
|
|
||||||
|
openwireProducer.send(outgoing);
|
||||||
|
|
||||||
|
// Now consume the MapMessage
|
||||||
|
Message received = amqpConsumer.receive(2000);
|
||||||
|
assertNotNull(received);
|
||||||
|
assertTrue("Expected MapMessage but got " + received, received instanceof ObjectMessage);
|
||||||
|
ObjectMessage incoming = (ObjectMessage) received;
|
||||||
|
|
||||||
|
Map<String, Object> incomingMap = (Map<String, Object>) incoming.getObject();
|
||||||
|
|
||||||
|
assertEquals(true, incomingMap.get("boolean"));
|
||||||
|
assertEquals(10, (byte) incomingMap.get("byte"));
|
||||||
|
assertEquals('B', incomingMap.get("char"));
|
||||||
|
assertEquals(24.42, (double) incomingMap.get("double"), 0.5);
|
||||||
|
assertEquals(3.14159f, (float) incomingMap.get("float"), 0.5f);
|
||||||
|
assertEquals(1024, incomingMap.get("integer"));
|
||||||
|
assertEquals(8096l, incomingMap.get("long"));
|
||||||
|
assertEquals(255, (short) incomingMap.get("short"));
|
||||||
|
|
||||||
|
// Test for the byte array which will be in an AMQP Binary as this message
|
||||||
|
// is received as an ObjectMessage by Qpid JMS
|
||||||
|
Object incomingValue = incomingMap.get("bytes");
|
||||||
|
assertNotNull(incomingValue);
|
||||||
|
assertTrue(incomingValue instanceof Binary);
|
||||||
|
Binary incomingBinary = (Binary) incomingValue;
|
||||||
|
byte[] incomingBytes = Arrays.copyOfRange(incomingBinary.getArray(), incomingBinary.getArrayOffset(), incomingBinary.getLength());
|
||||||
|
assertTrue(Arrays.equals(bytesValue, incomingBytes));
|
||||||
|
|
||||||
|
amqp.close();
|
||||||
|
openwire.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
//----- Tests for OpenWire <-> Qpid JMS using ObjectMessage --------------//
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test
|
||||||
|
public void testMapInObjectMessageSendReceive() throws Exception {
|
||||||
Connection openwire = createJMSConnection();
|
Connection openwire = createJMSConnection();
|
||||||
Connection amqp = createConnection();
|
Connection amqp = createConnection();
|
||||||
|
|
||||||
|
@ -284,7 +352,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
|
||||||
|
|
||||||
openwireProducer.send(outgoing);
|
openwireProducer.send(outgoing);
|
||||||
|
|
||||||
// Now consumer the ObjectMessage
|
// Now consume the ObjectMessage
|
||||||
Message received = amqpConsumer.receive(2000);
|
Message received = amqpConsumer.receive(2000);
|
||||||
assertNotNull(received);
|
assertNotNull(received);
|
||||||
assertTrue("Expected ObjectMessage but got " + received, received instanceof ObjectMessage);
|
assertTrue("Expected ObjectMessage but got " + received, received instanceof ObjectMessage);
|
||||||
|
@ -300,8 +368,6 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
|
||||||
openwire.close();
|
openwire.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
//----- Tests for OpenWire <-> Qpid JMS using ObjectMessage --------------//
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQpidToOpenWireObjectMessage() throws Exception {
|
public void testQpidToOpenWireObjectMessage() throws Exception {
|
||||||
|
|
||||||
|
@ -327,7 +393,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
|
||||||
outgoing.setObject(UUID.randomUUID());
|
outgoing.setObject(UUID.randomUUID());
|
||||||
amqpProducer.send(outgoing);
|
amqpProducer.send(outgoing);
|
||||||
|
|
||||||
// Now consumer the ObjectMessage
|
// Now consume the ObjectMessage
|
||||||
Message received = openwireConsumer.receive(2000);
|
Message received = openwireConsumer.receive(2000);
|
||||||
assertNotNull(received);
|
assertNotNull(received);
|
||||||
LOG.info("Read new message: {}", received);
|
LOG.info("Read new message: {}", received);
|
||||||
|
@ -366,7 +432,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
|
||||||
outgoing.setObject(UUID.randomUUID());
|
outgoing.setObject(UUID.randomUUID());
|
||||||
openwireProducer.send(outgoing);
|
openwireProducer.send(outgoing);
|
||||||
|
|
||||||
// Now consumer the ObjectMessage
|
// Now consume the ObjectMessage
|
||||||
Message received = amqpConsumer.receive(2000);
|
Message received = amqpConsumer.receive(2000);
|
||||||
assertNotNull(received);
|
assertNotNull(received);
|
||||||
LOG.info("Read new message: {}", received);
|
LOG.info("Read new message: {}", received);
|
||||||
|
@ -407,7 +473,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
|
||||||
outgoing.setObject(UUID.randomUUID());
|
outgoing.setObject(UUID.randomUUID());
|
||||||
openwireProducer.send(outgoing);
|
openwireProducer.send(outgoing);
|
||||||
|
|
||||||
// Now consumer the ObjectMessage
|
// Now consume the ObjectMessage
|
||||||
Message received = amqpConsumer.receive(2000);
|
Message received = amqpConsumer.receive(2000);
|
||||||
assertNotNull(received);
|
assertNotNull(received);
|
||||||
LOG.info("Read new message: {}", received);
|
LOG.info("Read new message: {}", received);
|
||||||
|
@ -454,7 +520,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
|
||||||
|
|
||||||
openwireProducer.send(outgoing);
|
openwireProducer.send(outgoing);
|
||||||
|
|
||||||
// Now consumer the ObjectMessage
|
// Now consume the ObjectMessage
|
||||||
Message received = amqpConsumer.receive(2000);
|
Message received = amqpConsumer.receive(2000);
|
||||||
assertNotNull(received);
|
assertNotNull(received);
|
||||||
assertTrue(received instanceof ObjectMessage);
|
assertTrue(received instanceof ObjectMessage);
|
||||||
|
@ -499,7 +565,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
|
||||||
amqpProducer.send(outgoing);
|
amqpProducer.send(outgoing);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now consumer the message
|
// Now consume the message
|
||||||
for (int i = 0; i < NUM_MESSAGES; ++i) {
|
for (int i = 0; i < NUM_MESSAGES; ++i) {
|
||||||
Message received = amqpConsumer.receive(2000);
|
Message received = amqpConsumer.receive(2000);
|
||||||
assertNotNull(received);
|
assertNotNull(received);
|
||||||
|
|
|
@ -24,12 +24,14 @@ import static org.junit.Assert.assertTrue;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.MapMessage;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.TemporaryQueue;
|
import javax.jms.TemporaryQueue;
|
||||||
import javax.jms.TemporaryTopic;
|
import javax.jms.TemporaryTopic;
|
||||||
|
@ -436,6 +438,39 @@ public class JMSMappingInboundTransformerTest {
|
||||||
assertEquals("Unexpected message class type", ActiveMQMapMessage.class, jmsMessage.getClass());
|
assertEquals("Unexpected message class type", ActiveMQMapMessage.class, jmsMessage.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that an amqp-value body containing a map that has an AMQP Binary as one of the
|
||||||
|
* entries encoded into the Map results in an MapMessage where a byte array can be read
|
||||||
|
* from the entry.
|
||||||
|
*
|
||||||
|
* @throws Exception if an error occurs during the test.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCreateAmqpMapMessageFromAmqpValueWithMapContainingBinaryEntry() throws Exception {
|
||||||
|
final String ENTRY_NAME = "bytesEntry";
|
||||||
|
|
||||||
|
Message message = Proton.message();
|
||||||
|
Map<String, Object> map = new HashMap<String, Object>();
|
||||||
|
|
||||||
|
byte[] inputBytes = new byte[] { 1, 2, 3, 4, 5 };
|
||||||
|
map.put(ENTRY_NAME, new Binary(inputBytes));
|
||||||
|
|
||||||
|
message.setBody(new AmqpValue(map));
|
||||||
|
|
||||||
|
EncodedMessage em = encodeMessage(message);
|
||||||
|
|
||||||
|
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer();
|
||||||
|
javax.jms.Message jmsMessage = transformer.transform(em);
|
||||||
|
|
||||||
|
assertNotNull("Message should not be null", jmsMessage);
|
||||||
|
assertEquals("Unexpected message class type", ActiveMQMapMessage.class, jmsMessage.getClass());
|
||||||
|
|
||||||
|
MapMessage mapMessage = (MapMessage) jmsMessage;
|
||||||
|
byte[] outputBytes = mapMessage.getBytes(ENTRY_NAME);
|
||||||
|
assertNotNull(outputBytes);
|
||||||
|
assertTrue(Arrays.equals(inputBytes, outputBytes));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that an amqp-value body containing a list results in an StreamMessage
|
* Test that an amqp-value body containing a list results in an StreamMessage
|
||||||
* when not otherwise annotated to indicate the type of JMS message it is.
|
* when not otherwise annotated to indicate the type of JMS message it is.
|
||||||
|
|
|
@ -275,6 +275,34 @@ public class JMSMappingOutboundTransformerTest {
|
||||||
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
|
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertMapMessageToAmqpMessageWithByteArrayValueInBody() throws Exception {
|
||||||
|
final byte[] byteArray = new byte[] { 1, 2, 3, 4, 5 };
|
||||||
|
|
||||||
|
ActiveMQMapMessage outbound = createMapMessage();
|
||||||
|
outbound.setBytes("bytes", byteArray);
|
||||||
|
outbound.onSend();
|
||||||
|
outbound.storeContent();
|
||||||
|
|
||||||
|
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
|
||||||
|
|
||||||
|
EncodedMessage encoded = transformer.transform(outbound);
|
||||||
|
assertNotNull(encoded);
|
||||||
|
|
||||||
|
Message amqp = encoded.decode();
|
||||||
|
|
||||||
|
assertNotNull(amqp.getBody());
|
||||||
|
assertTrue(amqp.getBody() instanceof AmqpValue);
|
||||||
|
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
|
||||||
|
|
||||||
|
assertEquals(1, amqpMap.size());
|
||||||
|
Binary readByteArray = (Binary) amqpMap.get("bytes");
|
||||||
|
assertNotNull(readByteArray);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConvertMapMessageToAmqpMessage() throws Exception {
|
public void testConvertMapMessageToAmqpMessage() throws Exception {
|
||||||
ActiveMQMapMessage outbound = createMapMessage();
|
ActiveMQMapMessage outbound = createMapMessage();
|
||||||
|
|
Loading…
Reference in New Issue