ARTEMIS-200 Message Compression Support

This commit is contained in:
Howard Gao 2015-09-11 15:29:49 +08:00
parent 6408fd0357
commit 0abf52468b
3 changed files with 631 additions and 16 deletions

View File

@ -22,10 +22,17 @@ import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import java.util.zip.InflaterOutputStream;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
@ -55,6 +62,7 @@ import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.util.ByteArrayInputStream; import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.hawtbuf.UTF8Buffer;
@ -85,8 +93,8 @@ public class OpenWireMessageConverter implements MessageConverter {
private static final String AMQ_MSG_TX_ID = AMQ_PREFIX + "TX_ID"; private static final String AMQ_MSG_TX_ID = AMQ_PREFIX + "TX_ID";
private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID"; private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID";
private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE"; private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
@Override @Override
public ServerMessage inbound(Object message) { public ServerMessage inbound(Object message) {
@ -118,9 +126,17 @@ public class OpenWireMessageConverter implements MessageConverter {
ByteSequence contents = messageSend.getContent(); ByteSequence contents = messageSend.getContent();
if (contents != null) { if (contents != null) {
ActiveMQBuffer body = coreMessage.getBodyBuffer(); ActiveMQBuffer body = coreMessage.getBodyBuffer();
boolean messageCompressed = messageSend.isCompressed();
if (messageCompressed) {
coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed);
}
switch (coreType) { switch (coreType) {
case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE: case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
ByteArrayInputStream tis = new ByteArrayInputStream(contents); InputStream tis = new ByteArrayInputStream(contents);
if (messageCompressed) {
tis = new InflaterInputStream(tis);
}
DataInputStream tdataIn = new DataInputStream(tis); DataInputStream tdataIn = new DataInputStream(tis);
String text = MarshallingSupport.readUTF8(tdataIn); String text = MarshallingSupport.readUTF8(tdataIn);
tdataIn.close(); tdataIn.close();
@ -128,6 +144,9 @@ public class OpenWireMessageConverter implements MessageConverter {
break; break;
case org.apache.activemq.artemis.api.core.Message.MAP_TYPE: case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
InputStream mis = new ByteArrayInputStream(contents); InputStream mis = new ByteArrayInputStream(contents);
if (messageCompressed) {
mis = new InflaterInputStream(mis);
}
DataInputStream mdataIn = new DataInputStream(mis); DataInputStream mdataIn = new DataInputStream(mis);
Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn); Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn);
mdataIn.close(); mdataIn.close();
@ -136,11 +155,33 @@ public class OpenWireMessageConverter implements MessageConverter {
props.encode(body); props.encode(body);
break; break;
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE: case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
if (messageCompressed) {
InputStream ois = new ByteArrayInputStream(contents);
ois = new InflaterInputStream(ois);
org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
try {
byte[] buf = new byte[1024];
int n = ois.read(buf);
while (n != -1) {
decompressed.write(buf, 0, n);
n = ois.read();
}
//read done
contents = decompressed.toByteSequence();
}
finally {
decompressed.close();
}
}
body.writeInt(contents.length); body.writeInt(contents.length);
body.writeBytes(contents.data, contents.offset, contents.length); body.writeBytes(contents.data, contents.offset, contents.length);
break; break;
case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE: case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
InputStream sis = new ByteArrayInputStream(contents); InputStream sis = new ByteArrayInputStream(contents);
if (messageCompressed) {
sis = new InflaterInputStream(sis);
}
DataInputStream sdis = new DataInputStream(sis); DataInputStream sdis = new DataInputStream(sis);
int stype = sdis.read(); int stype = sdis.read();
while (stype != -1) { while (stype != -1) {
@ -210,7 +251,47 @@ public class OpenWireMessageConverter implements MessageConverter {
} }
sdis.close(); sdis.close();
break; break;
case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
if (messageCompressed) {
Inflater inflater = new Inflater();
org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
try {
int length = ByteSequenceData.readIntBig(contents);
contents.offset = 0;
byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength());
inflater.setInput(data);
byte[] buffer = new byte[length];
int count = inflater.inflate(buffer);
decompressed.write(buffer, 0, count);
contents = decompressed.toByteSequence();
}
catch (Exception e) {
throw new IOException(e);
}
finally {
inflater.end();
decompressed.close();
}
}
body.writeBytes(contents.data, contents.offset, contents.length);
break;
default: default:
if (messageCompressed) {
org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
OutputStream os = new InflaterOutputStream(decompressed);
try {
os.write(contents.data, contents.offset, contents.getLength());
contents = decompressed.toByteSequence();
}
catch (Exception e) {
throw new IOException(e);
}
finally {
os.close();
decompressed.close();
}
}
body.writeBytes(contents.data, contents.offset, contents.length); body.writeBytes(contents.data, contents.offset, contents.length);
break; break;
} }
@ -317,7 +398,6 @@ public class OpenWireMessageConverter implements MessageConverter {
if (userId != null) { if (userId != null) {
coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId); coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
} }
coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageSend.isCompressed());
coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable()); coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
} }
@ -357,7 +437,7 @@ public class OpenWireMessageConverter implements MessageConverter {
public static MessageDispatch createMessageDispatch(ServerMessage message, public static MessageDispatch createMessageDispatch(ServerMessage message,
int deliveryCount, int deliveryCount,
AMQConsumer consumer) throws IOException { AMQConsumer consumer) throws IOException, JMSException {
ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination()); ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination());
MessageDispatch md = new MessageDispatch(); MessageDispatch md = new MessageDispatch();
@ -412,18 +492,25 @@ public class OpenWireMessageConverter implements MessageConverter {
amqMsg.setBrokerInTime(brokerInTime); amqMsg.setBrokerInTime(brokerInTime);
ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy(); ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy();
Boolean compressProp = (Boolean)coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
amqMsg.setCompressed(isCompressed);
if (buffer != null) { if (buffer != null) {
buffer.resetReaderIndex(); buffer.resetReaderIndex();
byte[] bytes = null; byte[] bytes = null;
synchronized (buffer) { synchronized (buffer) {
if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) { if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
SimpleString text = buffer.readNullableSimpleString(); SimpleString text = buffer.readNullableSimpleString();
if (text != null) { if (text != null) {
ByteArrayOutputStream out = new ByteArrayOutputStream(text.length() + 4); ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
OutputStream out = bytesOut;
if (isCompressed) {
out = new DeflaterOutputStream(out);
}
DataOutputStream dataOut = new DataOutputStream(out); DataOutputStream dataOut = new DataOutputStream(out);
MarshallingSupport.writeUTF8(dataOut, text.toString()); MarshallingSupport.writeUTF8(dataOut, text.toString());
bytes = out.toByteArray(); bytes = bytesOut.toByteArray();
out.close(); out.close();
} }
} }
@ -433,18 +520,33 @@ public class OpenWireMessageConverter implements MessageConverter {
Map<String, Object> map = mapData.getMap(); Map<String, Object> map = mapData.getMap();
ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
DataOutputStream dataOut = new DataOutputStream(out); OutputStream os = out;
if (isCompressed) {
os = new DeflaterOutputStream(os);
}
DataOutputStream dataOut = new DataOutputStream(os);
MarshallingSupport.marshalPrimitiveMap(map, dataOut); MarshallingSupport.marshalPrimitiveMap(map, dataOut);
bytes = out.toByteArray();
dataOut.close(); dataOut.close();
bytes = out.toByteArray();
} }
else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
int len = buffer.readInt(); int len = buffer.readInt();
bytes = new byte[len]; bytes = new byte[len];
buffer.readBytes(bytes); buffer.readBytes(bytes);
if (isCompressed) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
DeflaterOutputStream out = new DeflaterOutputStream(bytesOut);
out.write(bytes);
out.close();
bytes = bytesOut.toByteArray();
}
} }
else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) { else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
ByteArrayOutputStream out = new ByteArrayOutputStream(buffer.readableBytes()); org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
OutputStream out = bytesOut;
if (isCompressed) {
out = new DeflaterOutputStream(bytesOut);
}
DataOutputStream dataOut = new DataOutputStream(out); DataOutputStream dataOut = new DataOutputStream(out);
boolean stop = false; boolean stop = false;
@ -499,13 +601,52 @@ public class OpenWireMessageConverter implements MessageConverter {
break; break;
} }
} }
bytes = out.toByteArray();
dataOut.close(); dataOut.close();
bytes = bytesOut.toByteArray();
}
else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
int n = buffer.readableBytes();
bytes = new byte[n];
buffer.readBytes(bytes);
if (isCompressed) {
int length = bytes.length;
org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream();
compressed.write(new byte[4]);
Deflater deflater = new Deflater();
try {
deflater.setInput(bytes);
deflater.finish();
byte[] bytesBuf = new byte[1024];
while (!deflater.finished()) {
int count = deflater.deflate(bytesBuf);
compressed.write(bytesBuf, 0, count);
}
ByteSequence byteSeq = compressed.toByteSequence();
ByteSequenceData.writeIntBig(byteSeq, length);
bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
}
finally {
deflater.end();
compressed.close();
}
}
} }
else { else {
int n = buffer.readableBytes(); int n = buffer.readableBytes();
bytes = new byte[n]; bytes = new byte[n];
buffer.readBytes(bytes); buffer.readBytes(bytes);
if (isCompressed) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
DeflaterOutputStream out = new DeflaterOutputStream(bytesOut);
try {
out.write(bytes);
bytes = bytesOut.toByteArray();
}
finally {
out.close();
bytesOut.close();
}
}
} }
buffer.resetReaderIndex();// this is important for topics as the buffer buffer.resetReaderIndex();// this is important for topics as the buffer
@ -642,10 +783,6 @@ public class OpenWireMessageConverter implements MessageConverter {
amqMsg.setUserID(userId); amqMsg.setUserID(userId);
} }
Boolean isCompressed = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
if (isCompressed != null) {
amqMsg.setCompressed(isCompressed);
}
Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE); Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
if (isDroppable != null) { if (isDroppable != null) {
amqMsg.setDroppable(isDroppable); amqMsg.setDroppable(isDroppable);
@ -660,11 +797,12 @@ public class OpenWireMessageConverter implements MessageConverter {
throw new IOException("failure to set dlq property " + dlqCause, e); throw new IOException("failure to set dlq property " + dlqCause, e);
} }
} }
Set<SimpleString> props = coreMessage.getPropertyNames(); Set<SimpleString> props = coreMessage.getPropertyNames();
if (props != null) { if (props != null) {
for (SimpleString s : props) { for (SimpleString s : props) {
String keyStr = s.toString(); String keyStr = s.toString();
if (keyStr.startsWith("__AMQ") || keyStr.startsWith("__HDR_")) { if (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) {
continue; continue;
} }
Object prop = coreMessage.getObjectProperty(s); Object prop = coreMessage.getObjectProperty(s);
@ -681,6 +819,13 @@ public class OpenWireMessageConverter implements MessageConverter {
} }
} }
} }
try {
amqMsg.onSend();
amqMsg.setCompressed(isCompressed);
}
catch (JMSException e) {
throw new IOException("Failed to covert to Openwire message", e);
}
return amqMsg; return amqMsg;
} }

View File

@ -20,9 +20,12 @@ import java.io.UnsupportedEncodingException;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.StreamMessage;
import junit.framework.TestCase; import junit.framework.TestCase;
@ -66,6 +69,7 @@ public class MessageCompressionTest extends TestCase {
sendTestMessage(factory, TEXT); sendTestMessage(factory, TEXT);
message = receiveTestMessage(factory); message = receiveTestMessage(factory);
int unCompressedSize = message.getContent().getLength(); int unCompressedSize = message.getContent().getLength();
assertEquals(TEXT, message.getText());
assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize); assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
} }
@ -93,6 +97,209 @@ public class MessageCompressionTest extends TestCase {
assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize); assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
} }
public void testMapMessageCompression() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
factory.setUseCompression(true);
sendTestMapMessage(factory, TEXT);
ActiveMQMapMessage mapMessage = receiveTestMapMessage(factory);
int compressedSize = mapMessage.getContent().getLength();
assertTrue(mapMessage.isCompressed());
boolean booleanVal = mapMessage.getBoolean("boolean-type");
assertTrue(booleanVal);
byte byteVal = mapMessage.getByte("byte-type");
assertEquals((byte)10, byteVal);
byte[] bytesVal = mapMessage.getBytes("bytes-type");
byte[] originVal = TEXT.getBytes();
assertEquals(originVal.length, bytesVal.length);
for (int i = 0; i < bytesVal.length; i++) {
assertTrue(bytesVal[i] == originVal[i]);
}
char charVal = mapMessage.getChar("char-type");
assertEquals('A', charVal);
double doubleVal = mapMessage.getDouble("double-type");
assertEquals(55.3D, doubleVal, 0.1D);
float floatVal = mapMessage.getFloat("float-type");
assertEquals(79.1F, floatVal, 0.1F);
int intVal = mapMessage.getInt("int-type");
assertEquals(37, intVal);
long longVal = mapMessage.getLong("long-type");
assertEquals(56652L, longVal);
Object objectVal = mapMessage.getObject("object-type");
Object origVal = new String("VVVV");
assertTrue(objectVal.equals(origVal));
short shortVal = mapMessage.getShort("short-type");
assertEquals((short) 333, shortVal);
String strVal = mapMessage.getString("string-type");
assertEquals(TEXT, strVal);
factory = new ActiveMQConnectionFactory(connectionUri);
factory.setUseCompression(false);
sendTestMapMessage(factory, TEXT);
mapMessage = receiveTestMapMessage(factory);
int unCompressedSize = mapMessage.getContent().getLength();
assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
}
public void testStreamMessageCompression() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
factory.setUseCompression(true);
sendTestStreamMessage(factory, TEXT);
ActiveMQStreamMessage streamMessage = receiveTestStreamMessage(factory);
int compressedSize = streamMessage.getContent().getLength();
assertTrue(streamMessage.isCompressed());
boolean booleanVal = streamMessage.readBoolean();
assertTrue(booleanVal);
byte byteVal = streamMessage.readByte();
assertEquals((byte)10, byteVal);
byte[] originVal = TEXT.getBytes();
byte[] bytesVal = new byte[originVal.length];
streamMessage.readBytes(bytesVal);
for (int i = 0; i < bytesVal.length; i++) {
assertTrue(bytesVal[i] == originVal[i]);
}
char charVal = streamMessage.readChar();
assertEquals('A', charVal);
double doubleVal = streamMessage.readDouble();
assertEquals(55.3D, doubleVal, 0.1D);
float floatVal = streamMessage.readFloat();
assertEquals(79.1F, floatVal, 0.1F);
int intVal = streamMessage.readInt();
assertEquals(37, intVal);
long longVal = streamMessage.readLong();
assertEquals(56652L, longVal);
Object objectVal = streamMessage.readObject();
Object origVal = new String("VVVV");
assertTrue(objectVal.equals(origVal));
short shortVal = streamMessage.readShort();
assertEquals((short) 333, shortVal);
String strVal = streamMessage.readString();
assertEquals(TEXT, strVal);
factory = new ActiveMQConnectionFactory(connectionUri);
factory.setUseCompression(false);
sendTestStreamMessage(factory, TEXT);
streamMessage = receiveTestStreamMessage(factory);
int unCompressedSize = streamMessage.getContent().getLength();
System.out.println("compressedSize: " + compressedSize + " un: " + unCompressedSize);
assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
}
public void testObjectMessageCompression() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
factory.setUseCompression(true);
sendTestObjectMessage(factory, TEXT);
ActiveMQObjectMessage objectMessage = receiveTestObjectMessage(factory);
int compressedSize = objectMessage.getContent().getLength();
assertTrue(objectMessage.isCompressed());
Object objectVal = objectMessage.getObject();
assertEquals(TEXT, objectVal);
factory = new ActiveMQConnectionFactory(connectionUri);
factory.setUseCompression(false);
sendTestObjectMessage(factory, TEXT);
objectMessage = receiveTestObjectMessage(factory);
int unCompressedSize = objectMessage.getContent().getLength();
assertTrue("expected: compressed Size '" + compressedSize + "' < unCompressedSize '" + unCompressedSize + "'", compressedSize < unCompressedSize);
}
private void sendTestObjectMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(TEXT);
producer.send(objectMessage);
connection.close();
}
private ActiveMQObjectMessage receiveTestObjectMessage(ActiveMQConnectionFactory factory) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
ActiveMQObjectMessage rc = (ActiveMQObjectMessage) consumer.receive();
connection.close();
return rc;
}
private void sendTestStreamMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeBoolean(true);
streamMessage.writeByte((byte) 10);
streamMessage.writeBytes(TEXT.getBytes());
streamMessage.writeChar('A');
streamMessage.writeDouble(55.3D);
streamMessage.writeFloat(79.1F);
streamMessage.writeInt(37);
streamMessage.writeLong(56652L);
streamMessage.writeObject(new String("VVVV"));
streamMessage.writeShort((short) 333);
streamMessage.writeString(TEXT);
producer.send(streamMessage);
connection.close();
}
private ActiveMQStreamMessage receiveTestStreamMessage(ActiveMQConnectionFactory factory) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
ActiveMQStreamMessage rc = (ActiveMQStreamMessage) consumer.receive();
connection.close();
return rc;
}
private void sendTestMapMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setBoolean("boolean-type", true);
mapMessage.setByte("byte-type", (byte) 10);
mapMessage.setBytes("bytes-type", TEXT.getBytes());
mapMessage.setChar("char-type", 'A');
mapMessage.setDouble("double-type", 55.3D);
mapMessage.setFloat("float-type", 79.1F);
mapMessage.setInt("int-type", 37);
mapMessage.setLong("long-type", 56652L);
mapMessage.setObject("object-type", new String("VVVV"));
mapMessage.setShort("short-type", (short) 333);
mapMessage.setString("string-type", TEXT);
producer.send(mapMessage);
connection.close();
}
private ActiveMQMapMessage receiveTestMapMessage(ActiveMQConnectionFactory factory) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
ActiveMQMapMessage rc = (ActiveMQMapMessage) consumer.receive();
connection.close();
return rc;
}
private void sendTestMessage(ActiveMQConnectionFactory factory, String message) throws JMSException { private void sendTestMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire.interop;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.Before;
import org.junit.Test;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
public class CompressedInteropTest extends BasicOpenWireTest {
private static final String TEXT;
static {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 20; i++) {
builder.append("The quick red fox jumped over the lazy brown dog. ");
}
TEXT = builder.toString();
}
@Before
@Override
public void setUp() throws Exception {
factory.setUseCompression(true);
super.setUp();
connection.start();
assertTrue(connection.isUseCompression());
}
@Test
public void testCoreReceiveOpenWireCompressedMessages() throws Exception {
//TextMessage
sendCompressedTextMessageUsingOpenWire();
receiveTextMessageUsingCore();
//BytesMessage
sendCompressedBytesMessageUsingOpenWire();
receiveBytesMessageUsingCore();
//MapMessage
sendCompressedMapMessageUsingOpenWire();
receiveMapMessageUsingCore();
//StreamMessage
sendCompressedStreamMessageUsingOpenWire();
receiveStreamMessageUsingCore();
//ObjectMessage
sendCompressedObjectMessageUsingOpenWire();
receiveObjectMessageUsingCore();
}
private void sendCompressedStreamMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeBoolean(true);
streamMessage.writeByte((byte) 10);
streamMessage.writeBytes(TEXT.getBytes());
streamMessage.writeChar('A');
streamMessage.writeDouble(55.3D);
streamMessage.writeFloat(79.1F);
streamMessage.writeInt(37);
streamMessage.writeLong(56652L);
streamMessage.writeObject(new String("VVVV"));
streamMessage.writeShort((short) 333);
streamMessage.writeString(TEXT);
producer.send(streamMessage);
}
private void receiveStreamMessageUsingCore() throws Exception {
StreamMessage streamMessage = (StreamMessage) receiveMessageUsingCore();
boolean booleanVal = streamMessage.readBoolean();
assertTrue(booleanVal);
byte byteVal = streamMessage.readByte();
assertEquals((byte)10, byteVal);
byte[] originVal = TEXT.getBytes();
byte[] bytesVal = new byte[originVal.length];
streamMessage.readBytes(bytesVal);
for (int i = 0; i < bytesVal.length; i++) {
assertTrue(bytesVal[i] == originVal[i]);
}
char charVal = streamMessage.readChar();
assertEquals('A', charVal);
double doubleVal = streamMessage.readDouble();
assertEquals(55.3D, doubleVal, 0.1D);
float floatVal = streamMessage.readFloat();
assertEquals(79.1F, floatVal, 0.1F);
int intVal = streamMessage.readInt();
assertEquals(37, intVal);
long longVal = streamMessage.readLong();
assertEquals(56652L, longVal);
Object objectVal = streamMessage.readObject();
Object origVal = new String("VVVV");
assertTrue(objectVal.equals(origVal));
short shortVal = streamMessage.readShort();
assertEquals((short) 333, shortVal);
String strVal = streamMessage.readString();
assertEquals(TEXT, strVal);
}
private void sendCompressedObjectMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(TEXT);
producer.send(objectMessage);
}
private void receiveObjectMessageUsingCore() throws Exception {
ObjectMessage objectMessage = (ObjectMessage) receiveMessageUsingCore();
Object objectVal = objectMessage.getObject();
assertEquals(TEXT, objectVal);
}
private void sendCompressedMapMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setBoolean("boolean-type", true);
mapMessage.setByte("byte-type", (byte) 10);
mapMessage.setBytes("bytes-type", TEXT.getBytes());
mapMessage.setChar("char-type", 'A');
mapMessage.setDouble("double-type", 55.3D);
mapMessage.setFloat("float-type", 79.1F);
mapMessage.setInt("int-type", 37);
mapMessage.setLong("long-type", 56652L);
mapMessage.setObject("object-type", new String("VVVV"));
mapMessage.setShort("short-type", (short) 333);
mapMessage.setString("string-type", TEXT);
producer.send(mapMessage);
}
private void receiveMapMessageUsingCore() throws Exception {
MapMessage mapMessage = (MapMessage) receiveMessageUsingCore();
boolean booleanVal = mapMessage.getBoolean("boolean-type");
assertTrue(booleanVal);
byte byteVal = mapMessage.getByte("byte-type");
assertEquals((byte)10, byteVal);
byte[] bytesVal = mapMessage.getBytes("bytes-type");
byte[] originVal = TEXT.getBytes();
assertEquals(originVal.length, bytesVal.length);
for (int i = 0; i < bytesVal.length; i++) {
assertTrue(bytesVal[i] == originVal[i]);
}
char charVal = mapMessage.getChar("char-type");
assertEquals('A', charVal);
double doubleVal = mapMessage.getDouble("double-type");
assertEquals(55.3D, doubleVal, 0.1D);
float floatVal = mapMessage.getFloat("float-type");
assertEquals(79.1F, floatVal, 0.1F);
int intVal = mapMessage.getInt("int-type");
assertEquals(37, intVal);
long longVal = mapMessage.getLong("long-type");
assertEquals(56652L, longVal);
Object objectVal = mapMessage.getObject("object-type");
Object origVal = new String("VVVV");
assertTrue(objectVal.equals(origVal));
short shortVal = mapMessage.getShort("short-type");
assertEquals((short) 333, shortVal);
String strVal = mapMessage.getString("string-type");
assertEquals(TEXT, strVal);
}
private void sendCompressedBytesMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(TEXT.getBytes());
producer.send(bytesMessage);
}
private void receiveBytesMessageUsingCore() throws Exception {
BytesMessage bytesMessage = (BytesMessage) receiveMessageUsingCore();
byte[] bytes = new byte[TEXT.getBytes("UTF8").length];
bytesMessage.readBytes(bytes);
assertTrue(bytesMessage.readBytes(new byte[255]) == -1);
String rcvString = new String(bytes, "UTF8");
assertEquals(TEXT, rcvString);
}
private void receiveTextMessageUsingCore() throws Exception {
TextMessage txtMessage = (TextMessage) receiveMessageUsingCore();
assertEquals(TEXT, txtMessage.getText());
}
private Message receiveMessageUsingCore() throws Exception {
Connection jmsConn = null;
Message message = null;
try {
jmsConn = coreCf.createConnection();
jmsConn.start();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(this.queueName);
MessageConsumer coreConsumer = session.createConsumer(queue);
message = coreConsumer.receive(5000);
}
finally {
if (jmsConn != null) {
jmsConn.close();
}
}
return message;
}
private void sendCompressedTextMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
TextMessage textMessage = session.createTextMessage(TEXT);
producer.send(textMessage);
}
}