ARTEMIS-1765 Fixing Large Message Compression and Conversion

This commit is contained in:
Clebert Suconic 2018-03-22 10:40:59 -04:00
parent 51f105da7b
commit e86acd4824
9 changed files with 96 additions and 45 deletions

View File

@ -41,6 +41,13 @@ public interface ICoreMessage extends Message {
*/ */
ActiveMQBuffer getReadOnlyBodyBuffer(); ActiveMQBuffer getReadOnlyBodyBuffer();
/**
* Returns a readOnlyBodyBuffer or a decompressed one if the message is compressed.
* or the large message buffer.
* @return
*/
ActiveMQBuffer getDataBuffer();
/** /**
* Return the type of the message * Return the type of the message
*/ */

View File

@ -21,8 +21,11 @@ import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -38,6 +41,7 @@ import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.activemq.artemis.utils.collections.TypedProperties;
@ -213,6 +217,67 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly()); return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
} }
/**
* This will return the proper buffer to represent the data of the Message. If compressed it will decompress.
* If large, it will read from the file or streaming.
* @return
* @throws ActiveMQException
*/
@Override
public ActiveMQBuffer getDataBuffer() {
ActiveMQBuffer buffer;
try {
if (isLargeMessage()) {
buffer = getLargeMessageBuffer();
} else {
buffer = getReadOnlyBodyBuffer();
}
if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
buffer = inflate(buffer);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return getReadOnlyBodyBuffer();
}
return buffer;
}
private ActiveMQBuffer getLargeMessageBuffer() throws ActiveMQException {
ActiveMQBuffer buffer;
LargeBodyEncoder encoder = getBodyEncoder();
encoder.open();
int bodySize = (int) encoder.getLargeBodySize();
buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
encoder.encode(buffer, bodySize);
encoder.close();
return buffer;
}
private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
int bytesToRead = buffer.readableBytes();
Inflater inflater = new Inflater();
inflater.setInput(ByteUtil.getActiveArray(buffer.readBytes(bytesToRead).toByteBuffer()));
//get the real size of large message
long sizeBody = getLongProperty(Message.HDR_LARGE_BODY_SIZE);
byte[] data = new byte[(int) sizeBody];
inflater.inflate(data);
inflater.end();
ActiveMQBuffer qbuff = ActiveMQBuffers.wrappedBuffer(data);
qbuff.resetReaderIndex();
qbuff.resetWriterIndex();
qbuff.writeBytes(data);
buffer = qbuff;
return buffer;
}
@Override @Override
public SimpleString getGroupID() { public SimpleString getGroupID() {
return this.getSimpleStringProperty(Message.HDR_GROUP_ID); return this.getSimpleStringProperty(Message.HDR_GROUP_ID);

View File

@ -73,7 +73,7 @@ public class ServerJMSMessage implements Message {
protected ActiveMQBuffer getReadBodyBuffer() { protected ActiveMQBuffer getReadBodyBuffer() {
if (readBodyBuffer == null) { if (readBodyBuffer == null) {
// to avoid clashes between multiple threads // to avoid clashes between multiple threads
readBodyBuffer = message.getReadOnlyBodyBuffer(); readBodyBuffer = message.getDataBuffer();
} }
return readBodyBuffer; return readBodyBuffer;
} }

View File

@ -269,7 +269,7 @@ public class MQTTPublishManager {
switch (message.getType()) { switch (message.getType()) {
case Message.TEXT_TYPE: case Message.TEXT_TYPE:
try { try {
SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString(); SimpleString text = message.getDataBuffer().readNullableSimpleString();
byte[] stringPayload = text.toString().getBytes("UTF-8"); byte[] stringPayload = text.toString().getBytes("UTF-8");
payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length); payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
payload.writeBytes(stringPayload); payload.writeBytes(stringPayload);
@ -278,7 +278,7 @@ public class MQTTPublishManager {
log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e); log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
} }
default: default:
ActiveMQBuffer bodyBuffer = message.getReadOnlyBodyBuffer(); ActiveMQBuffer bodyBuffer = message.getDataBuffer();
payload = ByteBufAllocator.DEFAULT.buffer(bodyBuffer.writerIndex()); payload = ByteBufAllocator.DEFAULT.buffer(bodyBuffer.writerIndex());
payload.writeBytes(bodyBuffer.byteBuf()); payload.writeBytes(bodyBuffer.byteBuf());
break; break;

View File

@ -64,7 +64,6 @@ public class MQTTSessionCallback implements SessionCallback {
byte[] body, byte[] body,
boolean continues, boolean continues,
boolean requiresResponse) { boolean requiresResponse) {
log.warn("Sending LARGE MESSAGE");
return 1; return 1;
} }

View File

@ -154,7 +154,7 @@ public class MQTTSubscriptionManager {
*/ */
private void createConsumerForSubscriptionQueue(Queue queue, String topic, int qos) throws Exception { private void createConsumerForSubscriptionQueue(Queue queue, String topic, int qos) throws Exception {
long cid = session.getServer().getStorageManager().generateID(); long cid = session.getServer().getStorageManager().generateID();
ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), null, false, true, -1); ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), null, false, false, -1);
consumer.setStarted(true); consumer.setStarted(true);
consumers.put(topic, consumer); consumers.put(topic, consumer);

View File

@ -513,7 +513,7 @@ public final class OpenWireMessageConverter {
final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue(); final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
final byte[] bytes; final byte[] bytes;
final ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer(); final ActiveMQBuffer buffer = coreMessage.getDataBuffer();
buffer.resetReaderIndex(); buffer.resetReaderIndex();
switch (coreType) { switch (coreType) {

View File

@ -23,17 +23,13 @@ import java.util.Set;
import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.zip.Inflater;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -47,7 +43,6 @@ import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.PendingTask; import org.apache.activemq.artemis.utils.PendingTask;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -142,38 +137,7 @@ public class StompSession implements SessionCallback {
if (subscription == null) if (subscription == null)
return 0; return 0;
StompFrame frame; StompFrame frame;
ActiveMQBuffer buffer; ActiveMQBuffer buffer = coreMessage.getDataBuffer();
if (coreMessage.isLargeMessage()) {
LargeBodyEncoder encoder = coreMessage.getBodyEncoder();
encoder.open();
int bodySize = (int) encoder.getLargeBodySize();
buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize));
encoder.encode(buffer, bodySize);
encoder.close();
} else {
buffer = coreMessage.getReadOnlyBodyBuffer();
}
if (Boolean.TRUE.equals(serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) {
ActiveMQBuffer qbuff = buffer;
int bytesToRead = qbuff.readerIndex();
Inflater inflater = new Inflater();
inflater.setInput(ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer()));
//get the real size of large message
long sizeBody = newServerMessage.getLongProperty(Message.HDR_LARGE_BODY_SIZE);
byte[] data = new byte[(int) sizeBody];
inflater.inflate(data);
inflater.end();
qbuff.resetReaderIndex();
qbuff.resetWriterIndex();
qbuff.writeBytes(data);
buffer = qbuff;
}
frame = connection.createStompMessage(newServerMessage, buffer, subscription, deliveryCount); frame = connection.createStompMessage(newServerMessage, buffer, subscription, deliveryCount);

View File

@ -421,7 +421,10 @@ public class ConsumerTest extends ActiveMQTestBase {
private ConnectionFactory createFactory(int protocol) { private ConnectionFactory createFactory(int protocol) {
switch (protocol) { switch (protocol) {
case 1: return new ActiveMQConnectionFactory();// core protocol case 1: ActiveMQConnectionFactory coreCF = new ActiveMQConnectionFactory();// core protocol
coreCF.setCompressLargeMessage(true);
coreCF.setMinLargeMessageSize(10 * 1024);
return coreCF;
case 2: return new JmsConnectionFactory("amqp://localhost:61616"); // amqp case 2: return new JmsConnectionFactory("amqp://localhost:61616"); // amqp
case 3: return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); // openwire case 3: return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); // openwire
default: return null; default: return null;
@ -446,7 +449,15 @@ public class ConsumerTest extends ActiveMQTestBase {
TextMessage msg = session.createTextMessage("hello"); TextMessage msg = session.createTextMessage("hello");
msg.setIntProperty("mycount", 0); msg.setIntProperty("mycount", 0);
producer.send(msg); producer.send(msg);
connection.close();
StringBuffer bufferLarge = new StringBuffer();
while (bufferLarge.length() < 100 * 1024) {
bufferLarge.append(" ");
}
msg = session.createTextMessage(bufferLarge.toString());
msg.setIntProperty("mycount", 1);
producer.send(msg);
connection = factoryConsume.createConnection(); connection = factoryConsume.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -461,6 +472,11 @@ public class ConsumerTest extends ActiveMQTestBase {
Assert.assertEquals(0, message.getIntProperty("mycount")); Assert.assertEquals(0, message.getIntProperty("mycount"));
Assert.assertEquals("hello", message.getText()); Assert.assertEquals("hello", message.getText());
message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(1, message.getIntProperty("mycount"));
Assert.assertEquals(bufferLarge.toString(), message.getText());
Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100); Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100);
Assert.assertEquals(0, server.getPagingManager().getGlobalSize()); Assert.assertEquals(0, server.getPagingManager().getGlobalSize());