From 6db63acee2355a54bd17dc9f63bd9d65f12a3f3b Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 1 Jun 2020 12:50:25 +0200 Subject: [PATCH] ARTEMIS-2785 test Netty direct ByteBuf memory leak due to compression --- .../client/LargeMessageCompressTest.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java index 573ac1b5fa..e2eae05930 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.concurrent.atomic.AtomicLong; +import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -38,8 +39,12 @@ import org.apache.activemq.artemis.core.management.impl.QueueControlImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; + import javax.management.openmbean.CompositeData; /** @@ -129,6 +134,46 @@ public class LargeMessageCompressTest extends LargeMessageTest { validateNoFilesOnLargeDir(); } + @Test + public void testNoDirectByteBufLeaksOnLargeMessageCompression() throws Exception { + Assume.assumeThat(PlatformDependent.usedDirectMemory(), not(equalTo(Long.valueOf(-1)))); + final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + + ActiveMQServer server = createServer(true, isNetty()); + + server.start(); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = addClientSession(sf.createSession(false, false, false)); + + session.createQueue(new QueueConfiguration(ADDRESS).setAddress(ADDRESS).setDurable(false).setTemporary(true)); + + ClientProducer producer = session.createProducer(ADDRESS); + + Message clientFile = createLargeClientMessageStreaming(session, messageSize, true); + + producer.send(clientFile); + + session.commit(); + + session.start(); + + ClientConsumer consumer = session.createConsumer(ADDRESS); + final long usedDirectMemoryBeforeReceive = PlatformDependent.usedDirectMemory(); + ClientMessage msg1 = consumer.receive(1000); + Assert.assertNotNull(msg1); + final long usedDirectMemoryAfterReceive = PlatformDependent.usedDirectMemory(); + Assert.assertEquals("large message compression is leaking some Netty direct ByteBuff", + usedDirectMemoryBeforeReceive, usedDirectMemoryAfterReceive); + msg1.acknowledge(); + session.commit(); + + consumer.close(); + + session.close(); + } + @Test public void testLargeMessageCompression() throws Exception { final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);