diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 076b01fb1f..9b27b81d56 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -518,16 +518,18 @@ public class OpenWireMessageConverter implements MessageConverter { } } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { - int len = buffer.readInt(); - bytes = new byte[len]; - buffer.readBytes(bytes); - if (isCompressed) { - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { - out.write(bytes); - out.flush(); + if (buffer.readableBytes() > 0) { + int len = buffer.readInt(); + bytes = new byte[len]; + buffer.readBytes(bytes); + if (isCompressed) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { + out.write(bytes); + out.flush(); + } + bytes = bytesOut.toByteArray(); } - bytes = bytesOut.toByteArray(); } } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) { org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 509bb6de0e..6eb45a8245 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.openwire; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -26,11 +27,13 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; +import javax.jms.StreamMessage; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; @@ -161,6 +164,53 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } } + @Test + public void testSendEmptyMessages() throws Exception { + Queue dest = new ActiveMQQueue(queueName); + + QueueSession defaultQueueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + QueueSender defaultSender = defaultQueueSession.createSender(dest); + defaultSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + connection.start(); + + Message msg = defaultQueueSession.createMessage(); + msg.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(msg); + + QueueReceiver queueReceiver = defaultQueueSession.createReceiver(dest); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //bytes + BytesMessage bytesMessage = defaultQueueSession.createBytesMessage(); + bytesMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(bytesMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //map + MapMessage mapMessage = defaultQueueSession.createMapMessage(); + mapMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(mapMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //object + ObjectMessage objMessage = defaultQueueSession.createObjectMessage(); + objMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(objMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //stream + StreamMessage streamMessage = defaultQueueSession.createStreamMessage(); + streamMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(streamMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //text + TextMessage textMessage = defaultQueueSession.createTextMessage(); + textMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(textMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + } + @Test public void testXASimple() throws Exception { XAConnection connection = xaFactory.createXAConnection();