This closes #942 ARTEMIS-902 OpenWire Compression Issue

This commit is contained in:
Andy Taylor 2016-12-23 15:08:03 +00:00
commit 79b48767d4
2 changed files with 50 additions and 35 deletions

View File

@ -490,10 +490,11 @@ public class OpenWireMessageConverter implements MessageConverter {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
OutputStream out = bytesOut;
if (isCompressed) {
out = new DeflaterOutputStream(out);
out = new DeflaterOutputStream(out, true);
}
try (DataOutputStream dataOut = new DataOutputStream(out)) {
MarshallingSupport.writeUTF8(dataOut, text.toString());
dataOut.flush();
bytes = bytesOut.toByteArray();
}
}
@ -506,10 +507,11 @@ public class OpenWireMessageConverter implements MessageConverter {
ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
OutputStream os = out;
if (isCompressed) {
os = new DeflaterOutputStream(os);
os = new DeflaterOutputStream(os, true);
}
try (DataOutputStream dataOut = new DataOutputStream(os)) {
MarshallingSupport.marshalPrimitiveMap(map, dataOut);
dataOut.flush();
}
bytes = out.toByteArray();
}
@ -520,8 +522,9 @@ public class OpenWireMessageConverter implements MessageConverter {
buffer.readBytes(bytes);
if (isCompressed) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut)) {
try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
out.write(bytes);
out.flush();
}
bytes = bytesOut.toByteArray();
}
@ -529,7 +532,7 @@ public class OpenWireMessageConverter implements MessageConverter {
org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
OutputStream out = bytesOut;
if (isCompressed) {
out = new DeflaterOutputStream(bytesOut);
out = new DeflaterOutputStream(bytesOut, true);
}
try (DataOutputStream dataOut = new DataOutputStream(out)) {
@ -583,6 +586,7 @@ public class OpenWireMessageConverter implements MessageConverter {
stop = true;
break;
}
dataOut.flush();
}
}
bytes = bytesOut.toByteArray();
@ -602,6 +606,7 @@ public class OpenWireMessageConverter implements MessageConverter {
int count = deflater.deflate(bytesBuf);
compressed.write(bytesBuf, 0, count);
}
compressed.flush();
ByteSequence byteSeq = compressed.toByteSequence();
ByteSequenceData.writeIntBig(byteSeq, length);
bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
@ -615,8 +620,9 @@ public class OpenWireMessageConverter implements MessageConverter {
buffer.readBytes(bytes);
if (isCompressed) {
try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
DeflaterOutputStream out = new DeflaterOutputStream(bytesOut)) {
DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
out.write(bytes);
out.flush();
bytes = bytesOut.toByteArray();
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire.interop;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@ -63,24 +64,32 @@ public class CompressedInteropTest extends BasicOpenWireTest {
xaFactory.setUseCompression(true);
}
@Test
public void testCoreReceiveOpenWireCompressedMessages() throws Exception {
testCompressedMessageSendReceive(true);
}
@Test
public void testOpenWireReceiveOpenWireCompressedMessages() throws Exception {
testCompressedMessageSendReceive(false);
}
private void testCompressedMessageSendReceive(boolean useCore) throws Exception {
//TextMessage
sendCompressedTextMessageUsingOpenWire();
receiveTextMessageUsingCore();
receiveTextMessage(useCore);
//BytesMessage
sendCompressedBytesMessageUsingOpenWire();
receiveBytesMessageUsingCore();
receiveBytesMessage(useCore);
//MapMessage
sendCompressedMapMessageUsingOpenWire();
receiveMapMessageUsingCore();
receiveMapMessage(useCore);
//StreamMessage
sendCompressedStreamMessageUsingOpenWire();
receiveStreamMessageUsingCore();
receiveStreamMessage(useCore);
//ObjectMessage
sendCompressedObjectMessageUsingOpenWire();
receiveObjectMessageUsingCore();
receiveObjectMessage(useCore);
}
private void sendCompressedStreamMessageUsingOpenWire() throws Exception {
@ -106,8 +115,8 @@ public class CompressedInteropTest extends BasicOpenWireTest {
producer.send(streamMessage);
}
private void receiveStreamMessageUsingCore() throws Exception {
StreamMessage streamMessage = (StreamMessage) receiveMessageUsingCore();
private void receiveStreamMessage(boolean useCore) throws Exception {
StreamMessage streamMessage = (StreamMessage) receiveMessage(useCore);
boolean booleanVal = streamMessage.readBoolean();
assertTrue(booleanVal);
byte byteVal = streamMessage.readByte();
@ -149,8 +158,8 @@ public class CompressedInteropTest extends BasicOpenWireTest {
producer.send(objectMessage);
}
private void receiveObjectMessageUsingCore() throws Exception {
ObjectMessage objectMessage = (ObjectMessage) receiveMessageUsingCore();
private void receiveObjectMessage(boolean useCore) throws Exception {
ObjectMessage objectMessage = (ObjectMessage) receiveMessage(useCore);
Object objectVal = objectMessage.getObject();
assertEquals(TEXT, objectVal);
}
@ -178,8 +187,8 @@ public class CompressedInteropTest extends BasicOpenWireTest {
producer.send(mapMessage);
}
private void receiveMapMessageUsingCore() throws Exception {
MapMessage mapMessage = (MapMessage) receiveMessageUsingCore();
private void receiveMapMessage(boolean useCore) throws Exception {
MapMessage mapMessage = (MapMessage) receiveMessage(useCore);
boolean booleanVal = mapMessage.getBoolean("boolean-type");
assertTrue(booleanVal);
@ -222,8 +231,8 @@ public class CompressedInteropTest extends BasicOpenWireTest {
producer.send(bytesMessage);
}
private void receiveBytesMessageUsingCore() throws Exception {
BytesMessage bytesMessage = (BytesMessage) receiveMessageUsingCore();
private void receiveBytesMessage(boolean useCore) throws Exception {
BytesMessage bytesMessage = (BytesMessage) receiveMessage(useCore);
byte[] bytes = new byte[TEXT.getBytes(StandardCharsets.UTF_8).length];
bytesMessage.readBytes(bytes);
@ -233,16 +242,28 @@ public class CompressedInteropTest extends BasicOpenWireTest {
assertEquals(TEXT, rcvString);
}
private void receiveTextMessageUsingCore() throws Exception {
TextMessage txtMessage = (TextMessage) receiveMessageUsingCore();
private void receiveTextMessage(boolean useCore) throws Exception {
TextMessage txtMessage = (TextMessage) receiveMessage(useCore);
assertEquals(TEXT, txtMessage.getText());
}
private Message receiveMessageUsingCore() throws Exception {
private void sendCompressedTextMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
TextMessage textMessage = session.createTextMessage(TEXT);
producer.send(textMessage);
}
private Message receiveMessage(boolean useCore) throws Exception {
ConnectionFactory factoryToUse = useCore ? coreCf : factory;
Connection jmsConn = null;
Message message = null;
try {
jmsConn = coreCf.createConnection();
jmsConn = factoryToUse.createConnection();
jmsConn.start();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -257,16 +278,4 @@ public class CompressedInteropTest extends BasicOpenWireTest {
}
return message;
}
private void sendCompressedTextMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
TextMessage textMessage = session.createTextMessage(TEXT);
producer.send(textMessage);
}
}