From 0abf52468ba22004fc51e4314275259f97caa4dd Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Fri, 11 Sep 2015 15:29:49 +0800 Subject: [PATCH] ARTEMIS-200 Message Compression Support --- .../openwire/OpenWireMessageConverter.java | 177 ++++++++++-- .../command/MessageCompressionTest.java | 207 ++++++++++++++ .../interop/CompressedInteropTest.java | 263 ++++++++++++++++++ 3 files changed, 631 insertions(+), 16 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 717ca8e193..bda4c22105 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -22,10 +22,17 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; +import java.util.zip.InflaterOutputStream; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; @@ -55,6 +62,7 @@ import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.TransactionId; import org.apache.activemq.util.ByteArrayInputStream; import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.ByteSequenceData; import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtbuf.UTF8Buffer; @@ -85,8 +93,8 @@ public class OpenWireMessageConverter implements MessageConverter { private static final String AMQ_MSG_TX_ID = AMQ_PREFIX + "TX_ID"; private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID"; - private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED"; private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE"; + private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED"; @Override public ServerMessage inbound(Object message) { @@ -118,9 +126,17 @@ public class OpenWireMessageConverter implements MessageConverter { ByteSequence contents = messageSend.getContent(); if (contents != null) { ActiveMQBuffer body = coreMessage.getBodyBuffer(); + boolean messageCompressed = messageSend.isCompressed(); + if (messageCompressed) { + coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed); + } + switch (coreType) { case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE: - ByteArrayInputStream tis = new ByteArrayInputStream(contents); + InputStream tis = new ByteArrayInputStream(contents); + if (messageCompressed) { + tis = new InflaterInputStream(tis); + } DataInputStream tdataIn = new DataInputStream(tis); String text = MarshallingSupport.readUTF8(tdataIn); tdataIn.close(); @@ -128,6 +144,9 @@ public class OpenWireMessageConverter implements MessageConverter { break; case org.apache.activemq.artemis.api.core.Message.MAP_TYPE: InputStream mis = new ByteArrayInputStream(contents); + if (messageCompressed) { + mis = new InflaterInputStream(mis); + } DataInputStream mdataIn = new DataInputStream(mis); Map map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn); mdataIn.close(); @@ -136,11 +155,33 @@ public class OpenWireMessageConverter implements MessageConverter { props.encode(body); break; case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE: + if (messageCompressed) { + InputStream ois = new ByteArrayInputStream(contents); + ois = new InflaterInputStream(ois); + org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream(); + + try { + byte[] buf = new byte[1024]; + int n = ois.read(buf); + while (n != -1) { + decompressed.write(buf, 0, n); + n = ois.read(); + } + //read done + contents = decompressed.toByteSequence(); + } + finally { + decompressed.close(); + } + } body.writeInt(contents.length); body.writeBytes(contents.data, contents.offset, contents.length); break; case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE: InputStream sis = new ByteArrayInputStream(contents); + if (messageCompressed) { + sis = new InflaterInputStream(sis); + } DataInputStream sdis = new DataInputStream(sis); int stype = sdis.read(); while (stype != -1) { @@ -210,7 +251,47 @@ public class OpenWireMessageConverter implements MessageConverter { } sdis.close(); break; + case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE: + if (messageCompressed) { + Inflater inflater = new Inflater(); + org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream(); + try { + int length = ByteSequenceData.readIntBig(contents); + contents.offset = 0; + byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength()); + + inflater.setInput(data); + byte[] buffer = new byte[length]; + int count = inflater.inflate(buffer); + decompressed.write(buffer, 0, count); + contents = decompressed.toByteSequence(); + } + catch (Exception e) { + throw new IOException(e); + } + finally { + inflater.end(); + decompressed.close(); + } + } + body.writeBytes(contents.data, contents.offset, contents.length); + break; default: + if (messageCompressed) { + org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream(); + OutputStream os = new InflaterOutputStream(decompressed); + try { + os.write(contents.data, contents.offset, contents.getLength()); + contents = decompressed.toByteSequence(); + } + catch (Exception e) { + throw new IOException(e); + } + finally { + os.close(); + decompressed.close(); + } + } body.writeBytes(contents.data, contents.offset, contents.length); break; } @@ -317,7 +398,6 @@ public class OpenWireMessageConverter implements MessageConverter { if (userId != null) { coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId); } - coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageSend.isCompressed()); coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable()); } @@ -357,7 +437,7 @@ public class OpenWireMessageConverter implements MessageConverter { public static MessageDispatch createMessageDispatch(ServerMessage message, int deliveryCount, - AMQConsumer consumer) throws IOException { + AMQConsumer consumer) throws IOException, JMSException { ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination()); MessageDispatch md = new MessageDispatch(); @@ -412,18 +492,25 @@ public class OpenWireMessageConverter implements MessageConverter { amqMsg.setBrokerInTime(brokerInTime); ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy(); + Boolean compressProp = (Boolean)coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); + boolean isCompressed = compressProp == null ? false : compressProp.booleanValue(); + amqMsg.setCompressed(isCompressed); + if (buffer != null) { buffer.resetReaderIndex(); byte[] bytes = null; synchronized (buffer) { if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) { SimpleString text = buffer.readNullableSimpleString(); - if (text != null) { - ByteArrayOutputStream out = new ByteArrayOutputStream(text.length() + 4); + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4); + OutputStream out = bytesOut; + if (isCompressed) { + out = new DeflaterOutputStream(out); + } DataOutputStream dataOut = new DataOutputStream(out); MarshallingSupport.writeUTF8(dataOut, text.toString()); - bytes = out.toByteArray(); + bytes = bytesOut.toByteArray(); out.close(); } } @@ -433,18 +520,33 @@ public class OpenWireMessageConverter implements MessageConverter { Map map = mapData.getMap(); ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); - DataOutputStream dataOut = new DataOutputStream(out); + OutputStream os = out; + if (isCompressed) { + os = new DeflaterOutputStream(os); + } + DataOutputStream dataOut = new DataOutputStream(os); MarshallingSupport.marshalPrimitiveMap(map, dataOut); - bytes = out.toByteArray(); dataOut.close(); + bytes = out.toByteArray(); } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { int len = buffer.readInt(); bytes = new byte[len]; buffer.readBytes(bytes); + if (isCompressed) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + DeflaterOutputStream out = new DeflaterOutputStream(bytesOut); + out.write(bytes); + out.close(); + bytes = bytesOut.toByteArray(); + } } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) { - ByteArrayOutputStream out = new ByteArrayOutputStream(buffer.readableBytes()); + org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream(); + OutputStream out = bytesOut; + if (isCompressed) { + out = new DeflaterOutputStream(bytesOut); + } DataOutputStream dataOut = new DataOutputStream(out); boolean stop = false; @@ -499,13 +601,52 @@ public class OpenWireMessageConverter implements MessageConverter { break; } } - bytes = out.toByteArray(); dataOut.close(); + bytes = bytesOut.toByteArray(); + } + else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) { + int n = buffer.readableBytes(); + bytes = new byte[n]; + buffer.readBytes(bytes); + if (isCompressed) { + int length = bytes.length; + org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream(); + compressed.write(new byte[4]); + Deflater deflater = new Deflater(); + try { + deflater.setInput(bytes); + deflater.finish(); + byte[] bytesBuf = new byte[1024]; + while (!deflater.finished()) { + int count = deflater.deflate(bytesBuf); + compressed.write(bytesBuf, 0, count); + } + ByteSequence byteSeq = compressed.toByteSequence(); + ByteSequenceData.writeIntBig(byteSeq, length); + bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length); + } + finally { + deflater.end(); + compressed.close(); + } + } } else { int n = buffer.readableBytes(); bytes = new byte[n]; buffer.readBytes(bytes); + if (isCompressed) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + DeflaterOutputStream out = new DeflaterOutputStream(bytesOut); + try { + out.write(bytes); + bytes = bytesOut.toByteArray(); + } + finally { + out.close(); + bytesOut.close(); + } + } } buffer.resetReaderIndex();// this is important for topics as the buffer @@ -642,10 +783,6 @@ public class OpenWireMessageConverter implements MessageConverter { amqMsg.setUserID(userId); } - Boolean isCompressed = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); - if (isCompressed != null) { - amqMsg.setCompressed(isCompressed); - } Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE); if (isDroppable != null) { amqMsg.setDroppable(isDroppable); @@ -660,11 +797,12 @@ public class OpenWireMessageConverter implements MessageConverter { throw new IOException("failure to set dlq property " + dlqCause, e); } } + Set props = coreMessage.getPropertyNames(); if (props != null) { for (SimpleString s : props) { String keyStr = s.toString(); - if (keyStr.startsWith("__AMQ") || keyStr.startsWith("__HDR_")) { + if (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) { continue; } Object prop = coreMessage.getObjectProperty(s); @@ -681,6 +819,13 @@ public class OpenWireMessageConverter implements MessageConverter { } } } + try { + amqMsg.onSend(); + amqMsg.setCompressed(isCompressed); + } + catch (JMSException e) { + throw new IOException("Failed to covert to Openwire message", e); + } return amqMsg; } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java index 6b5cc45ff3..fc4182b30d 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java @@ -20,9 +20,12 @@ import java.io.UnsupportedEncodingException; import javax.jms.BytesMessage; import javax.jms.JMSException; +import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; import javax.jms.Session; +import javax.jms.StreamMessage; import junit.framework.TestCase; @@ -66,6 +69,7 @@ public class MessageCompressionTest extends TestCase { sendTestMessage(factory, TEXT); message = receiveTestMessage(factory); int unCompressedSize = message.getContent().getLength(); + assertEquals(TEXT, message.getText()); assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize); } @@ -93,6 +97,209 @@ public class MessageCompressionTest extends TestCase { assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize); } + public void testMapMessageCompression() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + factory.setUseCompression(true); + + sendTestMapMessage(factory, TEXT); + ActiveMQMapMessage mapMessage = receiveTestMapMessage(factory); + int compressedSize = mapMessage.getContent().getLength(); + assertTrue(mapMessage.isCompressed()); + + boolean booleanVal = mapMessage.getBoolean("boolean-type"); + assertTrue(booleanVal); + byte byteVal = mapMessage.getByte("byte-type"); + assertEquals((byte)10, byteVal); + byte[] bytesVal = mapMessage.getBytes("bytes-type"); + byte[] originVal = TEXT.getBytes(); + assertEquals(originVal.length, bytesVal.length); + for (int i = 0; i < bytesVal.length; i++) { + assertTrue(bytesVal[i] == originVal[i]); + } + char charVal = mapMessage.getChar("char-type"); + assertEquals('A', charVal); + double doubleVal = mapMessage.getDouble("double-type"); + assertEquals(55.3D, doubleVal, 0.1D); + float floatVal = mapMessage.getFloat("float-type"); + assertEquals(79.1F, floatVal, 0.1F); + int intVal = mapMessage.getInt("int-type"); + assertEquals(37, intVal); + long longVal = mapMessage.getLong("long-type"); + assertEquals(56652L, longVal); + Object objectVal = mapMessage.getObject("object-type"); + Object origVal = new String("VVVV"); + assertTrue(objectVal.equals(origVal)); + short shortVal = mapMessage.getShort("short-type"); + assertEquals((short) 333, shortVal); + String strVal = mapMessage.getString("string-type"); + assertEquals(TEXT, strVal); + + factory = new ActiveMQConnectionFactory(connectionUri); + factory.setUseCompression(false); + sendTestMapMessage(factory, TEXT); + mapMessage = receiveTestMapMessage(factory); + int unCompressedSize = mapMessage.getContent().getLength(); + + assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize); + } + + public void testStreamMessageCompression() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + factory.setUseCompression(true); + + sendTestStreamMessage(factory, TEXT); + ActiveMQStreamMessage streamMessage = receiveTestStreamMessage(factory); + int compressedSize = streamMessage.getContent().getLength(); + assertTrue(streamMessage.isCompressed()); + + boolean booleanVal = streamMessage.readBoolean(); + assertTrue(booleanVal); + byte byteVal = streamMessage.readByte(); + assertEquals((byte)10, byteVal); + byte[] originVal = TEXT.getBytes(); + byte[] bytesVal = new byte[originVal.length]; + streamMessage.readBytes(bytesVal); + for (int i = 0; i < bytesVal.length; i++) { + assertTrue(bytesVal[i] == originVal[i]); + } + char charVal = streamMessage.readChar(); + assertEquals('A', charVal); + double doubleVal = streamMessage.readDouble(); + assertEquals(55.3D, doubleVal, 0.1D); + float floatVal = streamMessage.readFloat(); + assertEquals(79.1F, floatVal, 0.1F); + int intVal = streamMessage.readInt(); + assertEquals(37, intVal); + long longVal = streamMessage.readLong(); + assertEquals(56652L, longVal); + Object objectVal = streamMessage.readObject(); + Object origVal = new String("VVVV"); + assertTrue(objectVal.equals(origVal)); + short shortVal = streamMessage.readShort(); + assertEquals((short) 333, shortVal); + String strVal = streamMessage.readString(); + assertEquals(TEXT, strVal); + + factory = new ActiveMQConnectionFactory(connectionUri); + factory.setUseCompression(false); + sendTestStreamMessage(factory, TEXT); + streamMessage = receiveTestStreamMessage(factory); + int unCompressedSize = streamMessage.getContent().getLength(); + + System.out.println("compressedSize: " + compressedSize + " un: " + unCompressedSize); + assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize); + } + + public void testObjectMessageCompression() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + factory.setUseCompression(true); + + sendTestObjectMessage(factory, TEXT); + ActiveMQObjectMessage objectMessage = receiveTestObjectMessage(factory); + int compressedSize = objectMessage.getContent().getLength(); + assertTrue(objectMessage.isCompressed()); + + Object objectVal = objectMessage.getObject(); + assertEquals(TEXT, objectVal); + + factory = new ActiveMQConnectionFactory(connectionUri); + factory.setUseCompression(false); + sendTestObjectMessage(factory, TEXT); + objectMessage = receiveTestObjectMessage(factory); + int unCompressedSize = objectMessage.getContent().getLength(); + + assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize); + } + + private void sendTestObjectMessage(ActiveMQConnectionFactory factory, String message) throws JMSException { + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + ObjectMessage objectMessage = session.createObjectMessage(); + + objectMessage.setObject(TEXT); + + producer.send(objectMessage); + connection.close(); + } + + private ActiveMQObjectMessage receiveTestObjectMessage(ActiveMQConnectionFactory factory) throws JMSException { + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + ActiveMQObjectMessage rc = (ActiveMQObjectMessage) consumer.receive(); + connection.close(); + return rc; + } + + private void sendTestStreamMessage(ActiveMQConnectionFactory factory, String message) throws JMSException { + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + StreamMessage streamMessage = session.createStreamMessage(); + + streamMessage.writeBoolean(true); + streamMessage.writeByte((byte) 10); + streamMessage.writeBytes(TEXT.getBytes()); + streamMessage.writeChar('A'); + streamMessage.writeDouble(55.3D); + streamMessage.writeFloat(79.1F); + streamMessage.writeInt(37); + streamMessage.writeLong(56652L); + streamMessage.writeObject(new String("VVVV")); + streamMessage.writeShort((short) 333); + streamMessage.writeString(TEXT); + + producer.send(streamMessage); + connection.close(); + } + + private ActiveMQStreamMessage receiveTestStreamMessage(ActiveMQConnectionFactory factory) throws JMSException { + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + ActiveMQStreamMessage rc = (ActiveMQStreamMessage) consumer.receive(); + connection.close(); + return rc; + } + + private void sendTestMapMessage(ActiveMQConnectionFactory factory, String message) throws JMSException { + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + MapMessage mapMessage = session.createMapMessage(); + + mapMessage.setBoolean("boolean-type", true); + mapMessage.setByte("byte-type", (byte) 10); + mapMessage.setBytes("bytes-type", TEXT.getBytes()); + mapMessage.setChar("char-type", 'A'); + mapMessage.setDouble("double-type", 55.3D); + mapMessage.setFloat("float-type", 79.1F); + mapMessage.setInt("int-type", 37); + mapMessage.setLong("long-type", 56652L); + mapMessage.setObject("object-type", new String("VVVV")); + mapMessage.setShort("short-type", (short) 333); + mapMessage.setString("string-type", TEXT); + + producer.send(mapMessage); + connection.close(); + } + + private ActiveMQMapMessage receiveTestMapMessage(ActiveMQConnectionFactory factory) throws JMSException { + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + ActiveMQMapMessage rc = (ActiveMQMapMessage) consumer.receive(); + connection.close(); + return rc; + } + private void sendTestMessage(ActiveMQConnectionFactory factory, String message) throws JMSException { ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java new file mode 100644 index 0000000000..2123f531ea --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire.interop; + +import org.apache.activemq.ActiveMQMessageProducer; +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; +import org.apache.activemq.command.ActiveMQDestination; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +public class CompressedInteropTest extends BasicOpenWireTest { + + private static final String TEXT; + static { + StringBuilder builder = new StringBuilder(); + + for (int i = 0; i < 20; i++) { + builder.append("The quick red fox jumped over the lazy brown dog. "); + } + TEXT = builder.toString(); + } + + @Before + @Override + public void setUp() throws Exception { + factory.setUseCompression(true); + super.setUp(); + connection.start(); + assertTrue(connection.isUseCompression()); + } + + @Test + public void testCoreReceiveOpenWireCompressedMessages() throws Exception { + //TextMessage + sendCompressedTextMessageUsingOpenWire(); + receiveTextMessageUsingCore(); + //BytesMessage + sendCompressedBytesMessageUsingOpenWire(); + receiveBytesMessageUsingCore(); + //MapMessage + sendCompressedMapMessageUsingOpenWire(); + receiveMapMessageUsingCore(); + //StreamMessage + sendCompressedStreamMessageUsingOpenWire(); + receiveStreamMessageUsingCore(); + //ObjectMessage + sendCompressedObjectMessageUsingOpenWire(); + receiveObjectMessageUsingCore(); + } + + private void sendCompressedStreamMessageUsingOpenWire() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); + + StreamMessage streamMessage = session.createStreamMessage(); + + streamMessage.writeBoolean(true); + streamMessage.writeByte((byte) 10); + streamMessage.writeBytes(TEXT.getBytes()); + streamMessage.writeChar('A'); + streamMessage.writeDouble(55.3D); + streamMessage.writeFloat(79.1F); + streamMessage.writeInt(37); + streamMessage.writeLong(56652L); + streamMessage.writeObject(new String("VVVV")); + streamMessage.writeShort((short) 333); + streamMessage.writeString(TEXT); + + producer.send(streamMessage); + } + + private void receiveStreamMessageUsingCore() throws Exception { + StreamMessage streamMessage = (StreamMessage) receiveMessageUsingCore(); + boolean booleanVal = streamMessage.readBoolean(); + assertTrue(booleanVal); + byte byteVal = streamMessage.readByte(); + assertEquals((byte)10, byteVal); + byte[] originVal = TEXT.getBytes(); + byte[] bytesVal = new byte[originVal.length]; + streamMessage.readBytes(bytesVal); + for (int i = 0; i < bytesVal.length; i++) { + assertTrue(bytesVal[i] == originVal[i]); + } + char charVal = streamMessage.readChar(); + assertEquals('A', charVal); + double doubleVal = streamMessage.readDouble(); + assertEquals(55.3D, doubleVal, 0.1D); + float floatVal = streamMessage.readFloat(); + assertEquals(79.1F, floatVal, 0.1F); + int intVal = streamMessage.readInt(); + assertEquals(37, intVal); + long longVal = streamMessage.readLong(); + assertEquals(56652L, longVal); + Object objectVal = streamMessage.readObject(); + Object origVal = new String("VVVV"); + assertTrue(objectVal.equals(origVal)); + short shortVal = streamMessage.readShort(); + assertEquals((short) 333, shortVal); + String strVal = streamMessage.readString(); + assertEquals(TEXT, strVal); + } + + private void sendCompressedObjectMessageUsingOpenWire() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); + + ObjectMessage objectMessage = session.createObjectMessage(); + objectMessage.setObject(TEXT); + + producer.send(objectMessage); + } + + private void receiveObjectMessageUsingCore() throws Exception { + ObjectMessage objectMessage = (ObjectMessage) receiveMessageUsingCore(); + Object objectVal = objectMessage.getObject(); + assertEquals(TEXT, objectVal); + } + + private void sendCompressedMapMessageUsingOpenWire() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); + + MapMessage mapMessage = session.createMapMessage(); + + mapMessage.setBoolean("boolean-type", true); + mapMessage.setByte("byte-type", (byte) 10); + mapMessage.setBytes("bytes-type", TEXT.getBytes()); + mapMessage.setChar("char-type", 'A'); + mapMessage.setDouble("double-type", 55.3D); + mapMessage.setFloat("float-type", 79.1F); + mapMessage.setInt("int-type", 37); + mapMessage.setLong("long-type", 56652L); + mapMessage.setObject("object-type", new String("VVVV")); + mapMessage.setShort("short-type", (short) 333); + mapMessage.setString("string-type", TEXT); + + producer.send(mapMessage); + } + + private void receiveMapMessageUsingCore() throws Exception { + MapMessage mapMessage = (MapMessage) receiveMessageUsingCore(); + + boolean booleanVal = mapMessage.getBoolean("boolean-type"); + assertTrue(booleanVal); + byte byteVal = mapMessage.getByte("byte-type"); + assertEquals((byte)10, byteVal); + byte[] bytesVal = mapMessage.getBytes("bytes-type"); + byte[] originVal = TEXT.getBytes(); + assertEquals(originVal.length, bytesVal.length); + for (int i = 0; i < bytesVal.length; i++) { + assertTrue(bytesVal[i] == originVal[i]); + } + char charVal = mapMessage.getChar("char-type"); + assertEquals('A', charVal); + double doubleVal = mapMessage.getDouble("double-type"); + assertEquals(55.3D, doubleVal, 0.1D); + float floatVal = mapMessage.getFloat("float-type"); + assertEquals(79.1F, floatVal, 0.1F); + int intVal = mapMessage.getInt("int-type"); + assertEquals(37, intVal); + long longVal = mapMessage.getLong("long-type"); + assertEquals(56652L, longVal); + Object objectVal = mapMessage.getObject("object-type"); + Object origVal = new String("VVVV"); + assertTrue(objectVal.equals(origVal)); + short shortVal = mapMessage.getShort("short-type"); + assertEquals((short) 333, shortVal); + String strVal = mapMessage.getString("string-type"); + assertEquals(TEXT, strVal); + } + + private void sendCompressedBytesMessageUsingOpenWire() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); + + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(TEXT.getBytes()); + + producer.send(bytesMessage); + } + + private void receiveBytesMessageUsingCore() throws Exception { + BytesMessage bytesMessage = (BytesMessage) receiveMessageUsingCore(); + + byte[] bytes = new byte[TEXT.getBytes("UTF8").length]; + bytesMessage.readBytes(bytes); + assertTrue(bytesMessage.readBytes(new byte[255]) == -1); + + String rcvString = new String(bytes, "UTF8"); + assertEquals(TEXT, rcvString); + } + + private void receiveTextMessageUsingCore() throws Exception { + TextMessage txtMessage = (TextMessage) receiveMessageUsingCore(); + assertEquals(TEXT, txtMessage.getText()); + } + + private Message receiveMessageUsingCore() throws Exception { + Connection jmsConn = null; + Message message = null; + try { + jmsConn = coreCf.createConnection(); + jmsConn.start(); + + Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(this.queueName); + MessageConsumer coreConsumer = session.createConsumer(queue); + + message = coreConsumer.receive(5000); + } + finally { + if (jmsConn != null) { + jmsConn.close(); + } + } + return message; + } + + private void sendCompressedTextMessageUsingOpenWire() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); + + TextMessage textMessage = session.createTextMessage(TEXT); + + producer.send(textMessage); + } + +}