ARTEMIS-4185 - Revision on sending already compressed messages

This commit is contained in:
a181321 2023-11-22 10:19:14 +01:00 committed by clebertsuconic
parent fbfe81eb4d
commit f56595b89b
3 changed files with 43 additions and 37 deletions

View File

@ -646,6 +646,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
qbuff.readBytes(body);
largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
currentLargeMessageController.addPacket(body, body.length, false);
largeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
handleRegularMessage(largeMessage);
}
@ -674,6 +675,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
if (clientLargeMessage.isCompressed()) {
clientLargeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
clientLargeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
} else {
clientLargeMessage.setLargeMessageController(currentLargeMessageController);
}

View File

@ -443,10 +443,6 @@ public class ClientProducerImpl implements ClientProducerInternal {
deflaterReader = new DeflaterReader(inputStreamParameter, messageSize);
deflaterReader.setLevel(session.getCompressionLevel());
input = deflaterReader;
} else if (msgI.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
//This needs to be false if we do not intend to compress the message
//and the header already exists
msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
}
long totalSize = 0;

View File

@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Deflater;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -518,52 +519,59 @@ public class LargeMessageCompressTest extends LargeMessageTest {
public void testPreviouslyCompressedMessageCleanup() throws Exception {
final int messageSize = 1024 * 1024;
byte[] payload = new byte[messageSize];
byte[] response = new byte[messageSize];
ActiveMQServer server = createServer(true, isNetty());
server.start();
ClientSessionFactory sf1 = createSessionFactory(locator);
ClientSession session1 = addClientSession(sf1.createSession(false, true, true));
session1.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
ClientProducer producer = session1.createProducer(ADDRESS);
ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0");
locator2.setCompressLargeMessage(false);
ClientSessionFactory sf2 = locator2.createSessionFactory();
ClientSession session2 = sf2.createSession(false, true, true);
ClientConsumer consumer = session2.createConsumer(ADDRESS);
ClientProducer producer2 = session2.createProducer(ADDRESS);
session2.start();
byte[] payload = new byte[messageSize];
byte[] response = new byte[messageSize];
server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
for (int i = 0; i < payload.length; i++) {
payload[i] = RandomUtil.randomByte();
}
ClientMessage message = session1.createMessage(true);
message.getBodyBuffer().writeBytes(payload);
producer.send(message);
try (ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(true, true);
ClientProducer producer = session.createProducer(ADDRESS)) {
message = consumer.receive();
assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeBytes(payload);
assertNull(message.getAnnotation(Message.HDR_LARGE_COMPRESSED));
message.getBodyBuffer().readBytes(response);
message.getBodyBuffer().writeBytes(response);
producer2.send(message);
producer.send(message);
}
message = consumer.receive();
assertFalse(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
ServerLocator locator2 = createFactory(isNetty());
locator2.setCompressLargeMessage(false);
locator2.setMinLargeMessageSize(1024);
message.getBodyBuffer().readBytes(payload);
message.getBodySize();
assertTrue(Arrays.equals(payload, response));
try (ClientSessionFactory sf = locator2.createSessionFactory();
ClientSession session = sf.createSession(true, true);
ClientConsumer consumer = session.createConsumer(ADDRESS);
ClientProducer producer = session.createProducer(ADDRESS)) {
ClientMessage message = session.createMessage(true);
ICoreMessage serverMessage = server.locateQueue(ADDRESS).browserIterator().next().getMessage().copy().toCore();
message.moveHeadersAndProperties(serverMessage);
message.getBodyBuffer().writeBytes(serverMessage.getReadOnlyBodyBuffer(), serverMessage.getBodyBufferSize());
assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
producer.send(message);
session.start();
for (int i = 0; i < 2; i++) {
message = consumer.receive(2000);
assertNotNull(message);
message.getBodyBuffer().readBytes(response);
assertTrue(Arrays.equals(payload, response));
message.acknowledge();
}
}
session1.close();
session2.close();
sf1.close();
locator.close();
sf2.close();
locator2.close();
}