diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index ebda1c18a9..182c7d832f 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -111,9 +111,13 @@ public final class OpenWireMessageConverter { final ActiveMQBuffer body = coreMessage.getBodyBuffer(); final ByteSequence contents = messageSend.getContent(); - if (contents == null && coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) { - body.writeNullableString(null); - } else if (contents != null) { + if (contents == null) { + if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) { + body.writeNullableString(null); + } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) { + body.writeByte(DataConstants.NULL); + } + } else { final boolean messageCompressed = messageSend.isCompressed(); if (messageCompressed) { coreMessage.putBooleanProperty(OpenWireConstants.AMQ_MSG_COMPRESSED, true); diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java index d10de16abd..92a1bfb3c1 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java +++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol.openwire; import org.apache.activemq.ActiveMQMessageAuditNoSync; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.message.impl.CoreMessage; @@ -28,7 +29,9 @@ import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.MessageDispatch; @@ -135,6 +138,16 @@ public class OpenWireMessageConverterTest { } } + @Test + public void testEmptyMapMessage() throws Exception { + CoreMessage artemisMessage = (CoreMessage) OpenWireMessageConverter.inbound(new ActiveMQMapMessage().getMessage(), openWireFormat, null); + assertEquals(Message.MAP_TYPE, artemisMessage.getType()); + ActiveMQBuffer buffer = artemisMessage.getDataBuffer(); + TypedProperties map = new TypedProperties(); + buffer.resetReaderIndex(); + map.decode(buffer.byteBuf()); + } + @Test public void testProducerId() throws Exception { final String PRODUCER_ID = "123:456:789"; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java index 570f1c2eb8..988fdc14f0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.jms.multiprotocol; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; +import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -31,6 +32,7 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.utils.DestinationUtil; +import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,4 +195,146 @@ public class JMSMessageConsumerTest extends MultiprotocolJMSClientTestSupport { assertEquals("color = 'BLUE'", queue.getFilter().getFilterString().toString()); } } + + @Test(timeout = 30000) + public void testEmptyMapMessageConversionBetweenOpenWireAndAMQP() throws Exception { + testEmptyMapMessageConversion(createOpenWireConnection(), createConnection()); + } + + @Test(timeout = 30000) + public void testEmptyMapMessageConversionBetweenAMQPAndOpenWire() throws Exception { + testEmptyMapMessageConversion(createConnection(), createOpenWireConnection()); + } + + @Test(timeout = 30000) + public void testEmptyMapMessageConversionBetweenCoreAndAMQP() throws Exception { + testEmptyMapMessageConversion(createCoreConnection(), createConnection()); + } + + @Test(timeout = 30000) + public void testEmptyMapMessageConversionBetweenAMQPAndCore() throws Exception { + testEmptyMapMessageConversion(createConnection(), createCoreConnection()); + } + + @Test(timeout = 30000) + public void testEmptyMapMessageConversionBetweenCoreAndOpenWire() throws Exception { + testEmptyMapMessageConversion(createCoreConnection(), createOpenWireConnection()); + } + + @Test(timeout = 30000) + public void testEmptyMapMessageConversionBetweenOpenWireAndCore() throws Exception { + testEmptyMapMessageConversion(createOpenWireConnection(), createCoreConnection()); + } + + private void testEmptyMapMessageConversion(Connection senderConnection, Connection consumerConnection) throws Exception { + try { + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getQueueName())); + + Session senderSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = senderSession.createProducer(senderSession.createQueue(getQueueName())); + MapMessage message = senderSession.createMapMessage(); + producer.send(message); + + Message received = consumer.receive(1000); + + assertNotNull("Should have received a message by now.", received); + assertTrue("Should be an instance of MapMessage", received instanceof MapMessage); + } finally { + senderConnection.close(); + consumerConnection.close(); + } + } + + @Test(timeout = 30000) + public void testMapMessageConversionBetweenAMQPAndOpenWire() throws Exception { + testMapMessageConversion(createConnection(), createOpenWireConnection()); + } + + @Test(timeout = 30000) + public void testMapMessageConversionBetweenCoreAndAMQP() throws Exception { + testMapMessageConversion(createCoreConnection(), createConnection()); + } + + @Test(timeout = 30000) + public void testMapMessageConversionBetweenAMQPAndCore() throws Exception { + testMapMessageConversion(createConnection(), createCoreConnection()); + } + + @Test(timeout = 30000) + public void testMapMessageConversionBetweenCoreAndOpenWire() throws Exception { + testMapMessageConversion(createCoreConnection(), createOpenWireConnection()); + } + + @Test(timeout = 30000) + public void testMapMessageConversionBetweenOpenWireAndCore() throws Exception { + testMapMessageConversion(createOpenWireConnection(), createCoreConnection()); + } + + private void testMapMessageConversion(Connection senderConnection, Connection consumerConnection) throws Exception { + final boolean BOOLEAN_VALUE = RandomUtil.randomBoolean(); + final String BOOLEAN_KEY = "myBoolean"; + final byte BYTE_VALUE = RandomUtil.randomByte(); + final String BYTE_KEY = "myByte"; + final byte[] BYTES_VALUE = RandomUtil.randomBytes(); + final String BYTES_KEY = "myBytes"; + final char CHAR_VALUE = RandomUtil.randomChar(); + final String CHAR_KEY = "myChar"; + final double DOUBLE_VALUE = RandomUtil.randomDouble(); + final String DOUBLE_KEY = "myDouble"; + final float FLOAT_VALUE = RandomUtil.randomFloat(); + final String FLOAT_KEY = "myFloat"; + final int INT_VALUE = RandomUtil.randomInt(); + final String INT_KEY = "myInt"; + final long LONG_VALUE = RandomUtil.randomLong(); + final String LONG_KEY = "myLong"; + final Boolean OBJECT_VALUE = RandomUtil.randomBoolean(); + final String OBJECT_KEY = "myObject"; + final short SHORT_VALUE = RandomUtil.randomShort(); + final String SHORT_KEY = "myShort"; + final String STRING_VALUE = RandomUtil.randomString(); + final String STRING_KEY = "myString"; + + try { + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getQueueName())); + + Session senderSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = senderSession.createProducer(senderSession.createQueue(getQueueName())); + MapMessage message = senderSession.createMapMessage();message.setBoolean(BOOLEAN_KEY, BOOLEAN_VALUE); + message.setByte(BYTE_KEY, BYTE_VALUE); + message.setBytes(BYTES_KEY, BYTES_VALUE); + message.setChar(CHAR_KEY, CHAR_VALUE); + message.setDouble(DOUBLE_KEY, DOUBLE_VALUE); + message.setFloat(FLOAT_KEY, FLOAT_VALUE); + message.setInt(INT_KEY, INT_VALUE); + message.setLong(LONG_KEY, LONG_VALUE); + message.setObject(OBJECT_KEY, OBJECT_VALUE); + message.setShort(SHORT_KEY, SHORT_VALUE); + message.setString(STRING_KEY, STRING_VALUE); + producer.send(message); + + Message received = consumer.receive(1000); + + assertNotNull("Should have received a message by now.", received); + assertTrue("Should be an instance of MapMessage", received instanceof MapMessage); + MapMessage receivedMapMessage = (MapMessage) received; + + assertEquals(BOOLEAN_VALUE, receivedMapMessage.getBoolean(BOOLEAN_KEY)); + assertEquals(BYTE_VALUE, receivedMapMessage.getByte(BYTE_KEY)); + assertEqualsByteArrays(BYTES_VALUE, receivedMapMessage.getBytes(BYTES_KEY)); + assertEquals(CHAR_VALUE, receivedMapMessage.getChar(CHAR_KEY)); + assertEquals(DOUBLE_VALUE, receivedMapMessage.getDouble(DOUBLE_KEY), 0); + assertEquals(FLOAT_VALUE, receivedMapMessage.getFloat(FLOAT_KEY), 0); + assertEquals(INT_VALUE, receivedMapMessage.getInt(INT_KEY)); + assertEquals(LONG_VALUE, receivedMapMessage.getLong(LONG_KEY)); + assertTrue(receivedMapMessage.getObject(OBJECT_KEY) instanceof Boolean); + assertEquals(OBJECT_VALUE, receivedMapMessage.getObject(OBJECT_KEY)); + assertEquals(SHORT_VALUE, receivedMapMessage.getShort(SHORT_KEY)); + assertEquals(STRING_VALUE, receivedMapMessage.getString(STRING_KEY)); + } finally { + senderConnection.close(); + consumerConnection.close(); + } + } }