This commit is contained in:
Clebert Suconic 2017-02-22 11:35:03 -05:00
commit d01874ae82
2 changed files with 61 additions and 9 deletions

View File

@ -518,16 +518,18 @@ public class OpenWireMessageConverter implements MessageConverter {
}
} else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
int len = buffer.readInt();
bytes = new byte[len];
buffer.readBytes(bytes);
if (isCompressed) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
out.write(bytes);
out.flush();
if (buffer.readableBytes() > 0) {
int len = buffer.readInt();
bytes = new byte[len];
buffer.readBytes(bytes);
if (isCompressed) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
out.write(bytes);
out.flush();
}
bytes = bytesOut.toByteArray();
}
bytes = bytesOut.toByteArray();
}
} else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@ -26,11 +27,13 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
@ -161,6 +164,53 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
}
@Test
public void testSendEmptyMessages() throws Exception {
Queue dest = new ActiveMQQueue(queueName);
QueueSession defaultQueueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender defaultSender = defaultQueueSession.createSender(dest);
defaultSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();
Message msg = defaultQueueSession.createMessage();
msg.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(msg);
QueueReceiver queueReceiver = defaultQueueSession.createReceiver(dest);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//bytes
BytesMessage bytesMessage = defaultQueueSession.createBytesMessage();
bytesMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(bytesMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//map
MapMessage mapMessage = defaultQueueSession.createMapMessage();
mapMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(mapMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//object
ObjectMessage objMessage = defaultQueueSession.createObjectMessage();
objMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(objMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//stream
StreamMessage streamMessage = defaultQueueSession.createStreamMessage();
streamMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(streamMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//text
TextMessage textMessage = defaultQueueSession.createTextMessage();
textMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(textMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
}
@Test
public void testXASimple() throws Exception {
XAConnection connection = xaFactory.createXAConnection();