This commit is contained in:
Clebert Suconic 2020-06-01 11:12:37 -04:00
commit 86360a2846
1 changed files with 45 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@ -38,8 +39,12 @@ import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import javax.management.openmbean.CompositeData;
/**
@ -129,6 +134,46 @@ public class LargeMessageCompressTest extends LargeMessageTest {
validateNoFilesOnLargeDir();
}
@Test
public void testNoDirectByteBufLeaksOnLargeMessageCompression() throws Exception {
Assume.assumeThat(PlatformDependent.usedDirectMemory(), not(equalTo(Long.valueOf(-1))));
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
ActiveMQServer server = createServer(true, isNetty());
server.start();
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, false, false));
session.createQueue(new QueueConfiguration(ADDRESS).setAddress(ADDRESS).setDurable(false).setTemporary(true));
ClientProducer producer = session.createProducer(ADDRESS);
Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);
producer.send(clientFile);
session.commit();
session.start();
ClientConsumer consumer = session.createConsumer(ADDRESS);
final long usedDirectMemoryBeforeReceive = PlatformDependent.usedDirectMemory();
ClientMessage msg1 = consumer.receive(1000);
Assert.assertNotNull(msg1);
final long usedDirectMemoryAfterReceive = PlatformDependent.usedDirectMemory();
Assert.assertEquals("large message compression is leaking some Netty direct ByteBuff",
usedDirectMemoryBeforeReceive, usedDirectMemoryAfterReceive);
msg1.acknowledge();
session.commit();
consumer.close();
session.close();
}
@Test
public void testLargeMessageCompression() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);