diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 5b228f2d5a..bedf6941f3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -70,7 +70,11 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar final int readableBytes = buffer.readableBytes(); lsm.addBytes(buffer); lsm.releaseResources(true, true); - lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes); + + if (!coreMessage.containsProperty(Message.HDR_LARGE_BODY_SIZE)) { + lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes); + } + return lsm.toMessage(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java index eaad11e29a..a4e1a722cb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java @@ -22,7 +22,11 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; import java.util.concurrent.atomic.AtomicLong; +import java.util.zip.Deflater; import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.Message; @@ -37,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.management.impl.QueueControlImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Assume; @@ -45,6 +50,13 @@ import org.junit.Test; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; import javax.management.openmbean.CompositeData; /** @@ -496,4 +508,157 @@ public class LargeMessageCompressTest extends LargeMessageTest { public void testSendServerMessage() throws Exception { // doesn't make sense as compressed } + + + // https://issues.apache.org/jira/projects/ARTEMIS/issues/ARTEMIS-3751 + @Test + public void testOverrideSize() throws Exception { + ActiveMQServer server = createServer(true, true); + + server.start(); + + ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=" + (200 * 1024) + "&compressLargeMessage=true"); + Connection connection = cf.createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + connection.start(); + Destination queue = session.createQueue("testQueue"); + + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + + // What we want is to get a message witch size is larger than client's minLargeMessageSize (200 kibs here) while compressed the size must be lower than that + // and at same time it must be greater than server minLargeMessageSize (which is 100 kibs by default) + byte[] data = new byte[1024 * 300]; + new DeflateGenerator(new Random(42), 2.0, 2.0).generate(data, 2); + + assertCompressionSize(data, 100 * 1024, 200 * 1024); + + BytesMessage outMessage = session.createBytesMessage(); + outMessage.writeBytes(data); + + producer.send(outMessage); + + javax.jms.Message inMessage = consumer.receive(1000); + + assertEqualsByteArrays(data, inMessage.getBody(byte[].class)); + } + + private void assertCompressionSize(byte[] data, int min, int max) { + byte[] output = new byte[data.length]; + + Deflater deflater = new Deflater(); + deflater.setInput(data); + deflater.finish(); + int compressed = deflater.deflate(output); + deflater.end(); + + assert compressed > min && compressed < max; + } + + /** + * Generate compressible data. + *
+ * Based on "SDGen: Mimicking Datasets for Content Generation in Storage + * Benchmarks" by Raúl Gracia-Tinedo et al. (https://www.usenix.org/node/188461) + * and https://github.com/jibsen/lzdatagen + */ + public static class DeflateGenerator { + + private static final int MIN_LENGTH = 3; + private static final int MAX_LENGTH = 258; + private static final int NUM_LENGTH = MAX_LENGTH - MIN_LENGTH; + + private static final int LENGTH_PER_CHUNK = 512; + + private final Random rnd; + + private final double dataExp; + private final double lengthExp; + + public DeflateGenerator(Random rnd, double dataExp, double lengthExp) { + this.rnd = rnd; + this.dataExp = dataExp; + this.lengthExp = lengthExp; + } + + private void nextBytes(byte[] buffer, int size) { + for (int i = 0; i < size; i++) { + buffer[i] = ((byte) ((double) 256 * Math.pow(rnd.nextDouble(), dataExp))); + } + } + + private byte[] nextBytes(int size) { + byte[] buffer = new byte[size]; + nextBytes(buffer, size); + return buffer; + } + + private void nextLengthFrequencies(int[] frequencies) { + Arrays.fill(frequencies, 0); + + for (int i = 0; i < LENGTH_PER_CHUNK; i++) { + int length = (int) ((double) frequencies.length * Math.pow(rnd.nextDouble(), lengthExp)); + + frequencies[length]++; + } + } + + public void generate(byte[] result, double ratio) { + ByteBuffer generated = generate(result.length, ratio); + generated.get(result); + } + + public ByteBuffer generate(int size, double ratio) { + ByteBuffer result = ByteBuffer.allocate(size); + + byte[] buffer = new byte[MAX_LENGTH]; + int[] frequencies = new int[NUM_LENGTH]; + + int length = 0; + int i = 0; + boolean repeat = false; + + while (i < size) { + while (frequencies[length] == 0) { + if (length == 0) { + nextBytes(buffer, MAX_LENGTH); + nextLengthFrequencies(frequencies); + + length = NUM_LENGTH; + } + + length--; + } + + int len = length + MIN_LENGTH; + frequencies[length]--; + + if (len > size - i) { + len = size - i; + } + + if (rnd.nextDouble() < 1.0 / ratio) { + result.put(nextBytes(len)); + repeat = false; + } else { + if (repeat) { + result.put(nextBytes(1)); + i++; + } + + result.put(buffer, 0, len); + + repeat = true; + } + + i += len; + } + + return result.flip(); + } + } + + }