This commit is contained in:
Clebert Suconic 2023-06-14 17:21:38 -04:00
commit 1e356eb27c
2 changed files with 58 additions and 0 deletions

View File

@ -443,6 +443,10 @@ public class ClientProducerImpl implements ClientProducerInternal {
deflaterReader = new DeflaterReader(inputStreamParameter, messageSize); deflaterReader = new DeflaterReader(inputStreamParameter, messageSize);
deflaterReader.setLevel(session.getCompressionLevel()); deflaterReader.setLevel(session.getCompressionLevel());
input = deflaterReader; input = deflaterReader;
} else if (msgI.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
//This needs to be false if we do not intend to compress the message
//and the header already exists
msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
} }
long totalSize = 0; long totalSize = 0;

View File

@ -31,6 +31,7 @@ import java.util.zip.Deflater;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@ -513,6 +514,59 @@ public class LargeMessageCompressTest extends LargeMessageTest {
validateNoFilesOnLargeDir(); validateNoFilesOnLargeDir();
} }
@Test
public void testPreviouslyCompressedMessageCleanup() throws Exception {
final int messageSize = 1024 * 1024;
ActiveMQServer server = createServer(true, isNetty());
server.start();
ClientSessionFactory sf1 = createSessionFactory(locator);
ClientSession session1 = addClientSession(sf1.createSession(false, true, true));
session1.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
ClientProducer producer = session1.createProducer(ADDRESS);
ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0");
locator2.setCompressLargeMessage(false);
ClientSessionFactory sf2 = locator2.createSessionFactory();
ClientSession session2 = sf2.createSession(false, true, true);
ClientConsumer consumer = session2.createConsumer(ADDRESS);
ClientProducer producer2 = session2.createProducer(ADDRESS);
session2.start();
byte[] payload = new byte[messageSize];
byte[] response = new byte[messageSize];
for (int i = 0; i < payload.length; i++) {
payload[i] = RandomUtil.randomByte();
}
ClientMessage message = session1.createMessage(true);
message.getBodyBuffer().writeBytes(payload);
producer.send(message);
message = consumer.receive();
assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
message.getBodyBuffer().readBytes(response);
message.getBodyBuffer().writeBytes(response);
producer2.send(message);
message = consumer.receive();
assertFalse(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
message.getBodyBuffer().readBytes(payload);
message.getBodySize();
assertTrue(Arrays.equals(payload, response));
session1.close();
session2.close();
sf1.close();
locator.close();
sf2.close();
locator2.close();
}
@Test @Test
public void testLargeMessageCompressionLevel() throws Exception { public void testLargeMessageCompressionLevel() throws Exception {