From bfe6b70c5ad63c6b1231f73e3ff8d9dbf0ecc6a8 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Fri, 7 Jun 2019 18:06:12 +0200 Subject: [PATCH] ARTEMIS-2374 JournalStorageManager::addBytesToLargeMessage leaks ByteBuffer --- artemis-core-client/pom.xml | 4 +- artemis-server/pom.xml | 6 ++ .../impl/journal/JournalStorageManager.java | 10 +-- .../journal/JournalStorageManagerTest.java | 62 ++++++++++++++++--- pom.xml | 1 + 5 files changed, 70 insertions(+), 13 deletions(-) diff --git a/artemis-core-client/pom.xml b/artemis-core-client/pom.xml index ac01cd8428..538cb76116 100644 --- a/artemis-core-client/pom.xml +++ b/artemis-core-client/pom.xml @@ -78,8 +78,8 @@ org.hamcrest - hamcrest-core - 1.3 + hamcrest + ${hamcrest.version} test diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml index ba9b4ceb7c..c8bb63b966 100644 --- a/artemis-server/pom.xml +++ b/artemis-server/pom.xml @@ -198,6 +198,12 @@ mockito-core test + + org.hamcrest + hamcrest + ${hamcrest.version} + test + diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 555da5210d..1d5fd2926a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -75,7 +75,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { protected SequentialFileFactory bindingsFF; - SequentialFileFactory largeMessagesFactory; + protected SequentialFileFactory largeMessagesFactory; protected Journal originalMessageJournal; @@ -818,7 +818,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { file.position(file.size()); if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) { final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes()); - file.writeDirect(nioBytes, false); + file.blockingWriteDirect(nioBytes, false, false); if (isReplicated()) { //copy defensively bytes @@ -843,8 +843,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager { readLock(); try { file.position(file.size()); - - file.writeDirect(ByteBuffer.wrap(bytes), false); + //that's an additional precaution to avoid ByteBuffer to be pooled: + //NIOSequentialFileFactory doesn't pool heap ByteBuffer, but better to make evident + //the intention by calling the right method + file.blockingWriteDirect(ByteBuffer.wrap(bytes), false, false); if (isReplicated()) { replicator.largeMessageWrite(messageId, bytes); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java index 861b5e08ac..a0580941be 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java @@ -15,6 +15,7 @@ */ package org.apache.activemq.artemis.core.persistence.impl.journal; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.UUID; @@ -26,8 +27,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -36,6 +40,7 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.impl.JournalLoader; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.junit.AfterClass; @@ -46,21 +51,22 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import static java.util.stream.Collectors.toList; +import static org.hamcrest.Matchers.is; import static org.junit.Assume.assumeTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @RunWith(Parameterized.class) -public class JournalStorageManagerTest { +public class JournalStorageManagerTest extends ActiveMQTestBase { @Parameterized.Parameter public JournalType journalType; @Parameterized.Parameters(name = "journal type={0}") public static Collection getParams() { - final JournalType[] values = JournalType.values(); return Stream.of(JournalType.values()) .map(journalType -> new Object[]{journalType}) .collect(toList()); @@ -89,14 +95,14 @@ public class JournalStorageManagerTest { * Test of fixJournalFileSize method, of class JournalStorageManager. */ @Test - public void testFixJournalFileSize() { + public void testFixJournalFileSize() throws Exception { if (journalType == JournalType.ASYNCIO) { assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported()); } - final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType); + final Configuration configuration = createDefaultInVMConfig().setJournalType(journalType); final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor); - final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory)); + final JournalStorageManager manager = new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory); Assert.assertEquals(4096, manager.fixJournalFileSize(1024, 4096)); Assert.assertEquals(4096, manager.fixJournalFileSize(4098, 4096)); Assert.assertEquals(8192, manager.fixJournalFileSize(8192, 4096)); @@ -107,7 +113,7 @@ public class JournalStorageManagerTest { if (journalType == JournalType.ASYNCIO) { assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported()); } - final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType); + final Configuration configuration = createDefaultInVMConfig().setJournalType(journalType); final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor); final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory)); @@ -164,4 +170,46 @@ public class JournalStorageManagerTest { stoppedReplication.get(); } + @Test + public void testAddBytesToLargeMessageNotLeakingByteBuffer() throws Exception { + if (journalType == JournalType.ASYNCIO) { + assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported()); + } + final Configuration configuration = createDefaultInVMConfig().setJournalType(journalType); + final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); + final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor); + final JournalStorageManager manager = new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory); + manager.largeMessagesFactory = spy(manager.largeMessagesFactory); + manager.start(); + manager.loadBindingJournal(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + final PostOffice postOffice = mock(PostOffice.class); + final JournalLoader journalLoader = mock(JournalLoader.class); + manager.loadMessageJournal(postOffice, null, null, null, null, null, null, journalLoader); + final long id = manager.generateID() + 1; + final SequentialFile file = manager.createFileForLargeMessage(id, false); + try { + file.open(); + doAnswer(invocation -> { + Assert.fail("No buffer should leak into the factory pool while writing into a large message"); + return invocation.callRealMethod(); + }).when(manager.largeMessagesFactory).releaseBuffer(any(ByteBuffer.class)); + final int size = 100; + final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size); + final ActiveMQBuffer directBuffer = ActiveMQBuffers.wrappedBuffer(byteBuffer); + directBuffer.writerIndex(size); + long fileSize = file.size(); + manager.addBytesToLargeMessage(file, 1, directBuffer); + Assert.assertThat(file.size(), is(fileSize + size)); + fileSize = file.size(); + final ActiveMQBuffer heapBuffer = ActiveMQBuffers.wrappedBuffer(new byte[size]); + heapBuffer.writerIndex(size); + manager.addBytesToLargeMessage(file, 1, heapBuffer); + Assert.assertThat(file.size(), is(fileSize + size)); + } finally { + manager.stop(); + file.close(); + file.delete(); + } + } + } diff --git a/pom.xml b/pom.xml index 2449d7b3d9..98021cde07 100644 --- a/pom.xml +++ b/pom.xml @@ -117,6 +117,7 @@ 0.7.9 2.4 1.1.4 + 2.1 2.4.3