From 2fabd059d88322404067527c90e7ca7f802349a6 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Sun, 19 Feb 2017 15:02:46 +0800 Subject: [PATCH] ARTEMIS-981 OpenWire can't receive empty ObjectMessage When sending an empty ObjectMessage, broker doesn't write a 'length' field to the message buffer. In delivery the broker tries to read the length from the buffer, which causes "IndexOutOfBoundsException". To fix it, we need to check if the buffer is empty or not, and only read it if the buffer is not empty. --- .../openwire/OpenWireMessageConverter.java | 20 ++++---- .../openwire/SimpleOpenWireTest.java | 50 +++++++++++++++++++ 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 076b01fb1f..9b27b81d56 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -518,16 +518,18 @@ public class OpenWireMessageConverter implements MessageConverter { } } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { - int len = buffer.readInt(); - bytes = new byte[len]; - buffer.readBytes(bytes); - if (isCompressed) { - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { - out.write(bytes); - out.flush(); + if (buffer.readableBytes() > 0) { + int len = buffer.readInt(); + bytes = new byte[len]; + buffer.readBytes(bytes); + if (isCompressed) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { + out.write(bytes); + out.flush(); + } + bytes = bytesOut.toByteArray(); } - bytes = bytesOut.toByteArray(); } } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) { org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 509bb6de0e..6eb45a8245 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.openwire; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -26,11 +27,13 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; +import javax.jms.StreamMessage; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; @@ -161,6 +164,53 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } } + @Test + public void testSendEmptyMessages() throws Exception { + Queue dest = new ActiveMQQueue(queueName); + + QueueSession defaultQueueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + QueueSender defaultSender = defaultQueueSession.createSender(dest); + defaultSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + connection.start(); + + Message msg = defaultQueueSession.createMessage(); + msg.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(msg); + + QueueReceiver queueReceiver = defaultQueueSession.createReceiver(dest); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //bytes + BytesMessage bytesMessage = defaultQueueSession.createBytesMessage(); + bytesMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(bytesMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //map + MapMessage mapMessage = defaultQueueSession.createMapMessage(); + mapMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(mapMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //object + ObjectMessage objMessage = defaultQueueSession.createObjectMessage(); + objMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(objMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //stream + StreamMessage streamMessage = defaultQueueSession.createStreamMessage(); + streamMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(streamMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + + //text + TextMessage textMessage = defaultQueueSession.createTextMessage(); + textMessage.setStringProperty("testName", "testSendEmptyMessages"); + defaultSender.send(textMessage); + assertNotNull("Didn't receive message", queueReceiver.receive(1000)); + } + @Test public void testXASimple() throws Exception { XAConnection connection = xaFactory.createXAConnection();