ARTEMIS-3751 Do not override large message size

It is possible to receive a compressed message from the client as regular message. Such a message will already contain correct body size, that takes compression into account.
This commit is contained in:
iliya 2022-04-17 13:15:47 +03:00 committed by Clebert Suconic
parent da5d5e504f
commit 20370d2920
2 changed files with 170 additions and 1 deletions

View File

@ -70,7 +70,11 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
final int readableBytes = buffer.readableBytes(); final int readableBytes = buffer.readableBytes();
lsm.addBytes(buffer); lsm.addBytes(buffer);
lsm.releaseResources(true, true); lsm.releaseResources(true, true);
lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
if (!coreMessage.containsProperty(Message.HDR_LARGE_BODY_SIZE)) {
lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
}
return lsm.toMessage(); return lsm.toMessage();
} }

View File

@ -22,7 +22,11 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Deflater;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
@ -37,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl; import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume; import org.junit.Assume;
@ -45,6 +50,13 @@ import org.junit.Test;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.not;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeData;
/** /**
@ -496,4 +508,157 @@ public class LargeMessageCompressTest extends LargeMessageTest {
public void testSendServerMessage() throws Exception { public void testSendServerMessage() throws Exception {
// doesn't make sense as compressed // doesn't make sense as compressed
} }
// https://issues.apache.org/jira/projects/ARTEMIS/issues/ARTEMIS-3751
@Test
public void testOverrideSize() throws Exception {
ActiveMQServer server = createServer(true, true);
server.start();
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=" + (200 * 1024) + "&compressLargeMessage=true");
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Destination queue = session.createQueue("testQueue");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
// What we want is to get a message witch size is larger than client's minLargeMessageSize (200 kibs here) while compressed the size must be lower than that
// and at same time it must be greater than server minLargeMessageSize (which is 100 kibs by default)
byte[] data = new byte[1024 * 300];
new DeflateGenerator(new Random(42), 2.0, 2.0).generate(data, 2);
assertCompressionSize(data, 100 * 1024, 200 * 1024);
BytesMessage outMessage = session.createBytesMessage();
outMessage.writeBytes(data);
producer.send(outMessage);
javax.jms.Message inMessage = consumer.receive(1000);
assertEqualsByteArrays(data, inMessage.getBody(byte[].class));
}
private void assertCompressionSize(byte[] data, int min, int max) {
byte[] output = new byte[data.length];
Deflater deflater = new Deflater();
deflater.setInput(data);
deflater.finish();
int compressed = deflater.deflate(output);
deflater.end();
assert compressed > min && compressed < max;
}
/**
* Generate compressible data.
* <p>
* Based on "SDGen: Mimicking Datasets for Content Generation in Storage
* Benchmarks" by Raúl Gracia-Tinedo et al. (https://www.usenix.org/node/188461)
* and https://github.com/jibsen/lzdatagen
*/
public static class DeflateGenerator {
private static final int MIN_LENGTH = 3;
private static final int MAX_LENGTH = 258;
private static final int NUM_LENGTH = MAX_LENGTH - MIN_LENGTH;
private static final int LENGTH_PER_CHUNK = 512;
private final Random rnd;
private final double dataExp;
private final double lengthExp;
public DeflateGenerator(Random rnd, double dataExp, double lengthExp) {
this.rnd = rnd;
this.dataExp = dataExp;
this.lengthExp = lengthExp;
}
private void nextBytes(byte[] buffer, int size) {
for (int i = 0; i < size; i++) {
buffer[i] = ((byte) ((double) 256 * Math.pow(rnd.nextDouble(), dataExp)));
}
}
private byte[] nextBytes(int size) {
byte[] buffer = new byte[size];
nextBytes(buffer, size);
return buffer;
}
private void nextLengthFrequencies(int[] frequencies) {
Arrays.fill(frequencies, 0);
for (int i = 0; i < LENGTH_PER_CHUNK; i++) {
int length = (int) ((double) frequencies.length * Math.pow(rnd.nextDouble(), lengthExp));
frequencies[length]++;
}
}
public void generate(byte[] result, double ratio) {
ByteBuffer generated = generate(result.length, ratio);
generated.get(result);
}
public ByteBuffer generate(int size, double ratio) {
ByteBuffer result = ByteBuffer.allocate(size);
byte[] buffer = new byte[MAX_LENGTH];
int[] frequencies = new int[NUM_LENGTH];
int length = 0;
int i = 0;
boolean repeat = false;
while (i < size) {
while (frequencies[length] == 0) {
if (length == 0) {
nextBytes(buffer, MAX_LENGTH);
nextLengthFrequencies(frequencies);
length = NUM_LENGTH;
}
length--;
}
int len = length + MIN_LENGTH;
frequencies[length]--;
if (len > size - i) {
len = size - i;
}
if (rnd.nextDouble() < 1.0 / ratio) {
result.put(nextBytes(len));
repeat = false;
} else {
if (repeat) {
result.put(nextBytes(1));
i++;
}
result.put(buffer, 0, len);
repeat = true;
}
i += len;
}
return result.flip();
}
}
} }