diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index f49c972ca9..950210bdbd 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -490,10 +490,11 @@ public class OpenWireMessageConverter implements MessageConverter { ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4); OutputStream out = bytesOut; if (isCompressed) { - out = new DeflaterOutputStream(out); + out = new DeflaterOutputStream(out, true); } try (DataOutputStream dataOut = new DataOutputStream(out)) { MarshallingSupport.writeUTF8(dataOut, text.toString()); + dataOut.flush(); bytes = bytesOut.toByteArray(); } } @@ -506,10 +507,11 @@ public class OpenWireMessageConverter implements MessageConverter { ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); OutputStream os = out; if (isCompressed) { - os = new DeflaterOutputStream(os); + os = new DeflaterOutputStream(os, true); } try (DataOutputStream dataOut = new DataOutputStream(os)) { MarshallingSupport.marshalPrimitiveMap(map, dataOut); + dataOut.flush(); } bytes = out.toByteArray(); } @@ -520,8 +522,9 @@ public class OpenWireMessageConverter implements MessageConverter { buffer.readBytes(bytes); if (isCompressed) { ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut)) { + try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { out.write(bytes); + out.flush(); } bytes = bytesOut.toByteArray(); } @@ -529,7 +532,7 @@ public class OpenWireMessageConverter implements MessageConverter { org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream(); OutputStream out = bytesOut; if (isCompressed) { - out = new DeflaterOutputStream(bytesOut); + out = new DeflaterOutputStream(bytesOut, true); } try (DataOutputStream dataOut = new DataOutputStream(out)) { @@ -583,6 +586,7 @@ public class OpenWireMessageConverter implements MessageConverter { stop = true; break; } + dataOut.flush(); } } bytes = bytesOut.toByteArray(); @@ -602,6 +606,7 @@ public class OpenWireMessageConverter implements MessageConverter { int count = deflater.deflate(bytesBuf); compressed.write(bytesBuf, 0, count); } + compressed.flush(); ByteSequence byteSeq = compressed.toByteSequence(); ByteSequenceData.writeIntBig(byteSeq, length); bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length); @@ -615,8 +620,9 @@ public class OpenWireMessageConverter implements MessageConverter { buffer.readBytes(bytes); if (isCompressed) { try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - DeflaterOutputStream out = new DeflaterOutputStream(bytesOut)) { + DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) { out.write(bytes); + out.flush(); bytes = bytesOut.toByteArray(); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java index ada2e55397..196660924c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire.interop; import javax.jms.BytesMessage; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -63,24 +64,32 @@ public class CompressedInteropTest extends BasicOpenWireTest { xaFactory.setUseCompression(true); } - @Test public void testCoreReceiveOpenWireCompressedMessages() throws Exception { + testCompressedMessageSendReceive(true); + } + + @Test + public void testOpenWireReceiveOpenWireCompressedMessages() throws Exception { + testCompressedMessageSendReceive(false); + } + + private void testCompressedMessageSendReceive(boolean useCore) throws Exception { //TextMessage sendCompressedTextMessageUsingOpenWire(); - receiveTextMessageUsingCore(); + receiveTextMessage(useCore); //BytesMessage sendCompressedBytesMessageUsingOpenWire(); - receiveBytesMessageUsingCore(); + receiveBytesMessage(useCore); //MapMessage sendCompressedMapMessageUsingOpenWire(); - receiveMapMessageUsingCore(); + receiveMapMessage(useCore); //StreamMessage sendCompressedStreamMessageUsingOpenWire(); - receiveStreamMessageUsingCore(); + receiveStreamMessage(useCore); //ObjectMessage sendCompressedObjectMessageUsingOpenWire(); - receiveObjectMessageUsingCore(); + receiveObjectMessage(useCore); } private void sendCompressedStreamMessageUsingOpenWire() throws Exception { @@ -106,8 +115,8 @@ public class CompressedInteropTest extends BasicOpenWireTest { producer.send(streamMessage); } - private void receiveStreamMessageUsingCore() throws Exception { - StreamMessage streamMessage = (StreamMessage) receiveMessageUsingCore(); + private void receiveStreamMessage(boolean useCore) throws Exception { + StreamMessage streamMessage = (StreamMessage) receiveMessage(useCore); boolean booleanVal = streamMessage.readBoolean(); assertTrue(booleanVal); byte byteVal = streamMessage.readByte(); @@ -149,8 +158,8 @@ public class CompressedInteropTest extends BasicOpenWireTest { producer.send(objectMessage); } - private void receiveObjectMessageUsingCore() throws Exception { - ObjectMessage objectMessage = (ObjectMessage) receiveMessageUsingCore(); + private void receiveObjectMessage(boolean useCore) throws Exception { + ObjectMessage objectMessage = (ObjectMessage) receiveMessage(useCore); Object objectVal = objectMessage.getObject(); assertEquals(TEXT, objectVal); } @@ -178,8 +187,8 @@ public class CompressedInteropTest extends BasicOpenWireTest { producer.send(mapMessage); } - private void receiveMapMessageUsingCore() throws Exception { - MapMessage mapMessage = (MapMessage) receiveMessageUsingCore(); + private void receiveMapMessage(boolean useCore) throws Exception { + MapMessage mapMessage = (MapMessage) receiveMessage(useCore); boolean booleanVal = mapMessage.getBoolean("boolean-type"); assertTrue(booleanVal); @@ -222,8 +231,8 @@ public class CompressedInteropTest extends BasicOpenWireTest { producer.send(bytesMessage); } - private void receiveBytesMessageUsingCore() throws Exception { - BytesMessage bytesMessage = (BytesMessage) receiveMessageUsingCore(); + private void receiveBytesMessage(boolean useCore) throws Exception { + BytesMessage bytesMessage = (BytesMessage) receiveMessage(useCore); byte[] bytes = new byte[TEXT.getBytes(StandardCharsets.UTF_8).length]; bytesMessage.readBytes(bytes); @@ -233,16 +242,28 @@ public class CompressedInteropTest extends BasicOpenWireTest { assertEquals(TEXT, rcvString); } - private void receiveTextMessageUsingCore() throws Exception { - TextMessage txtMessage = (TextMessage) receiveMessageUsingCore(); + private void receiveTextMessage(boolean useCore) throws Exception { + TextMessage txtMessage = (TextMessage) receiveMessage(useCore); assertEquals(TEXT, txtMessage.getText()); } - private Message receiveMessageUsingCore() throws Exception { + private void sendCompressedTextMessageUsingOpenWire() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); + + TextMessage textMessage = session.createTextMessage(TEXT); + + producer.send(textMessage); + } + + private Message receiveMessage(boolean useCore) throws Exception { + ConnectionFactory factoryToUse = useCore ? coreCf : factory; Connection jmsConn = null; Message message = null; try { - jmsConn = coreCf.createConnection(); + jmsConn = factoryToUse.createConnection(); jmsConn.start(); Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -257,16 +278,4 @@ public class CompressedInteropTest extends BasicOpenWireTest { } return message; } - - private void sendCompressedTextMessageUsingOpenWire() throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); - - final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); - - TextMessage textMessage = session.createTextMessage(TEXT); - - producer.send(textMessage); - } - }