From e86acd4824d3a94d124693fbd7887a1f1b9b072c Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 22 Mar 2018 10:40:59 -0400 Subject: [PATCH] ARTEMIS-1765 Fixing Large Message Compression and Conversion --- .../artemis/api/core/ICoreMessage.java | 7 ++ .../core/message/impl/CoreMessage.java | 65 +++++++++++++++++++ .../amqp/converter/jms/ServerJMSMessage.java | 2 +- .../protocol/mqtt/MQTTPublishManager.java | 4 +- .../protocol/mqtt/MQTTSessionCallback.java | 1 - .../mqtt/MQTTSubscriptionManager.java | 2 +- .../openwire/OpenWireMessageConverter.java | 2 +- .../core/protocol/stomp/StompSession.java | 38 +---------- .../integration/client/ConsumerTest.java | 20 +++++- 9 files changed, 96 insertions(+), 45 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java index 179f8c5876..f0eb1b675d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java @@ -41,6 +41,13 @@ public interface ICoreMessage extends Message { */ ActiveMQBuffer getReadOnlyBodyBuffer(); + /** + * Returns a readOnlyBodyBuffer or a decompressed one if the message is compressed. + * or the large message buffer. + * @return + */ + ActiveMQBuffer getDataBuffer(); + /** * Return the type of the message */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 5ed46cdd41..09188281d4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -21,8 +21,11 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.Set; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -38,6 +41,7 @@ import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.reader.MessageUtil; +import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.collections.TypedProperties; @@ -213,6 +217,67 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly()); } + /** + * This will return the proper buffer to represent the data of the Message. If compressed it will decompress. + * If large, it will read from the file or streaming. + * @return + * @throws ActiveMQException + */ + @Override + public ActiveMQBuffer getDataBuffer() { + + ActiveMQBuffer buffer; + + try { + if (isLargeMessage()) { + buffer = getLargeMessageBuffer(); + } else { + buffer = getReadOnlyBodyBuffer(); + } + + if (Boolean.TRUE.equals(getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) { + buffer = inflate(buffer); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + return getReadOnlyBodyBuffer(); + } + + return buffer; + } + + private ActiveMQBuffer getLargeMessageBuffer() throws ActiveMQException { + ActiveMQBuffer buffer; + LargeBodyEncoder encoder = getBodyEncoder(); + encoder.open(); + int bodySize = (int) encoder.getLargeBodySize(); + + buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize)); + + encoder.encode(buffer, bodySize); + encoder.close(); + return buffer; + } + + private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException { + int bytesToRead = buffer.readableBytes(); + Inflater inflater = new Inflater(); + inflater.setInput(ByteUtil.getActiveArray(buffer.readBytes(bytesToRead).toByteBuffer())); + + //get the real size of large message + long sizeBody = getLongProperty(Message.HDR_LARGE_BODY_SIZE); + + byte[] data = new byte[(int) sizeBody]; + inflater.inflate(data); + inflater.end(); + ActiveMQBuffer qbuff = ActiveMQBuffers.wrappedBuffer(data); + qbuff.resetReaderIndex(); + qbuff.resetWriterIndex(); + qbuff.writeBytes(data); + buffer = qbuff; + return buffer; + } + @Override public SimpleString getGroupID() { return this.getSimpleStringProperty(Message.HDR_GROUP_ID); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java index b070591c42..0ef2041fec 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java @@ -73,7 +73,7 @@ public class ServerJMSMessage implements Message { protected ActiveMQBuffer getReadBodyBuffer() { if (readBodyBuffer == null) { // to avoid clashes between multiple threads - readBodyBuffer = message.getReadOnlyBodyBuffer(); + readBodyBuffer = message.getDataBuffer(); } return readBodyBuffer; } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index febc364ae1..667f5be46a 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -269,7 +269,7 @@ public class MQTTPublishManager { switch (message.getType()) { case Message.TEXT_TYPE: try { - SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString(); + SimpleString text = message.getDataBuffer().readNullableSimpleString(); byte[] stringPayload = text.toString().getBytes("UTF-8"); payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length); payload.writeBytes(stringPayload); @@ -278,7 +278,7 @@ public class MQTTPublishManager { log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e); } default: - ActiveMQBuffer bodyBuffer = message.getReadOnlyBodyBuffer(); + ActiveMQBuffer bodyBuffer = message.getDataBuffer(); payload = ByteBufAllocator.DEFAULT.buffer(bodyBuffer.writerIndex()); payload.writeBytes(bodyBuffer.byteBuf()); break; diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index 39e2ba94ed..a49cf116c6 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -64,7 +64,6 @@ public class MQTTSessionCallback implements SessionCallback { byte[] body, boolean continues, boolean requiresResponse) { - log.warn("Sending LARGE MESSAGE"); return 1; } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 49ab5d9196..4093f5e28c 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -154,7 +154,7 @@ public class MQTTSubscriptionManager { */ private void createConsumerForSubscriptionQueue(Queue queue, String topic, int qos) throws Exception { long cid = session.getServer().getStorageManager().generateID(); - ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), null, false, true, -1); + ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), null, false, false, -1); consumer.setStarted(true); consumers.put(topic, consumer); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 66fff2d353..fd1f7320d5 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -513,7 +513,7 @@ public final class OpenWireMessageConverter { final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue(); final byte[] bytes; - final ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer(); + final ActiveMQBuffer buffer = coreMessage.getDataBuffer(); buffer.resetReaderIndex(); switch (coreType) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 0a12b47953..8a573e6c3a 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -23,17 +23,13 @@ import java.util.Set; import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; -import java.util.zip.Inflater; -import io.netty.buffer.UnpooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; -import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -47,7 +43,6 @@ import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; -import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.PendingTask; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -142,38 +137,7 @@ public class StompSession implements SessionCallback { if (subscription == null) return 0; StompFrame frame; - ActiveMQBuffer buffer; - - if (coreMessage.isLargeMessage()) { - LargeBodyEncoder encoder = coreMessage.getBodyEncoder(); - encoder.open(); - int bodySize = (int) encoder.getLargeBodySize(); - - buffer = new ChannelBufferWrapper(UnpooledByteBufAllocator.DEFAULT.heapBuffer(bodySize)); - - encoder.encode(buffer, bodySize); - encoder.close(); - } else { - buffer = coreMessage.getReadOnlyBodyBuffer(); - } - - if (Boolean.TRUE.equals(serverMessage.getBooleanProperty(Message.HDR_LARGE_COMPRESSED))) { - ActiveMQBuffer qbuff = buffer; - int bytesToRead = qbuff.readerIndex(); - Inflater inflater = new Inflater(); - inflater.setInput(ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer())); - - //get the real size of large message - long sizeBody = newServerMessage.getLongProperty(Message.HDR_LARGE_BODY_SIZE); - - byte[] data = new byte[(int) sizeBody]; - inflater.inflate(data); - inflater.end(); - qbuff.resetReaderIndex(); - qbuff.resetWriterIndex(); - qbuff.writeBytes(data); - buffer = qbuff; - } + ActiveMQBuffer buffer = coreMessage.getDataBuffer(); frame = connection.createStompMessage(newServerMessage, buffer, subscription, deliveryCount); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 0b36e18928..f103d4182f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -421,7 +421,10 @@ public class ConsumerTest extends ActiveMQTestBase { private ConnectionFactory createFactory(int protocol) { switch (protocol) { - case 1: return new ActiveMQConnectionFactory();// core protocol + case 1: ActiveMQConnectionFactory coreCF = new ActiveMQConnectionFactory();// core protocol + coreCF.setCompressLargeMessage(true); + coreCF.setMinLargeMessageSize(10 * 1024); + return coreCF; case 2: return new JmsConnectionFactory("amqp://localhost:61616"); // amqp case 3: return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); // openwire default: return null; @@ -446,7 +449,15 @@ public class ConsumerTest extends ActiveMQTestBase { TextMessage msg = session.createTextMessage("hello"); msg.setIntProperty("mycount", 0); producer.send(msg); - connection.close(); + + StringBuffer bufferLarge = new StringBuffer(); + while (bufferLarge.length() < 100 * 1024) { + bufferLarge.append(" "); + } + + msg = session.createTextMessage(bufferLarge.toString()); + msg.setIntProperty("mycount", 1); + producer.send(msg); connection = factoryConsume.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -461,6 +472,11 @@ public class ConsumerTest extends ActiveMQTestBase { Assert.assertEquals(0, message.getIntProperty("mycount")); Assert.assertEquals("hello", message.getText()); + message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals(1, message.getIntProperty("mycount")); + Assert.assertEquals(bufferLarge.toString(), message.getText()); + Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100); Assert.assertEquals(0, server.getPagingManager().getGlobalSize());