ARTEMIS-981 OpenWire can't receive empty ObjectMessage
When sending an empty ObjectMessage, broker doesn't write a 'length' field to the message buffer. In delivery the broker tries to read the length from the buffer, which causes "IndexOutOfBoundsException". To fix it, we need to check if the buffer is empty or not, and only read it if the buffer is not empty.
This commit is contained in:
parent
6974b115f7
commit
2fabd059d8
|
@ -518,16 +518,18 @@ public class OpenWireMessageConverter implements MessageConverter {
|
|||
}
|
||||
|
||||
} else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
|
||||
int len = buffer.readInt();
|
||||
bytes = new byte[len];
|
||||
buffer.readBytes(bytes);
|
||||
if (isCompressed) {
|
||||
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
|
||||
out.write(bytes);
|
||||
out.flush();
|
||||
if (buffer.readableBytes() > 0) {
|
||||
int len = buffer.readInt();
|
||||
bytes = new byte[len];
|
||||
buffer.readBytes(bytes);
|
||||
if (isCompressed) {
|
||||
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
|
||||
out.write(bytes);
|
||||
out.flush();
|
||||
}
|
||||
bytes = bytesOut.toByteArray();
|
||||
}
|
||||
bytes = bytesOut.toByteArray();
|
||||
}
|
||||
} else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
|
||||
org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.openwire;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
|
@ -26,11 +27,13 @@ import javax.jms.Message;
|
|||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.QueueReceiver;
|
||||
import javax.jms.QueueSender;
|
||||
import javax.jms.QueueSession;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.TextMessage;
|
||||
|
@ -161,6 +164,53 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendEmptyMessages() throws Exception {
|
||||
Queue dest = new ActiveMQQueue(queueName);
|
||||
|
||||
QueueSession defaultQueueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
QueueSender defaultSender = defaultQueueSession.createSender(dest);
|
||||
defaultSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
connection.start();
|
||||
|
||||
Message msg = defaultQueueSession.createMessage();
|
||||
msg.setStringProperty("testName", "testSendEmptyMessages");
|
||||
defaultSender.send(msg);
|
||||
|
||||
QueueReceiver queueReceiver = defaultQueueSession.createReceiver(dest);
|
||||
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
|
||||
|
||||
//bytes
|
||||
BytesMessage bytesMessage = defaultQueueSession.createBytesMessage();
|
||||
bytesMessage.setStringProperty("testName", "testSendEmptyMessages");
|
||||
defaultSender.send(bytesMessage);
|
||||
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
|
||||
|
||||
//map
|
||||
MapMessage mapMessage = defaultQueueSession.createMapMessage();
|
||||
mapMessage.setStringProperty("testName", "testSendEmptyMessages");
|
||||
defaultSender.send(mapMessage);
|
||||
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
|
||||
|
||||
//object
|
||||
ObjectMessage objMessage = defaultQueueSession.createObjectMessage();
|
||||
objMessage.setStringProperty("testName", "testSendEmptyMessages");
|
||||
defaultSender.send(objMessage);
|
||||
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
|
||||
|
||||
//stream
|
||||
StreamMessage streamMessage = defaultQueueSession.createStreamMessage();
|
||||
streamMessage.setStringProperty("testName", "testSendEmptyMessages");
|
||||
defaultSender.send(streamMessage);
|
||||
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
|
||||
|
||||
//text
|
||||
TextMessage textMessage = defaultQueueSession.createTextMessage();
|
||||
textMessage.setStringProperty("testName", "testSendEmptyMessages");
|
||||
defaultSender.send(textMessage);
|
||||
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXASimple() throws Exception {
|
||||
XAConnection connection = xaFactory.createXAConnection();
|
||||
|
|
Loading…
Reference in New Issue