ARTEMIS-2374 JournalStorageManager::addBytesToLargeMessage leaks ByteBuffer

This commit is contained in:
Francesco Nigro 2019-06-07 18:06:12 +02:00
parent 42f0157765
commit bfe6b70c5a
5 changed files with 70 additions and 13 deletions

View File

@ -78,8 +78,8 @@
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<artifactId>hamcrest</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@ -198,6 +198,12 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

View File

@ -75,7 +75,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
protected SequentialFileFactory bindingsFF;
SequentialFileFactory largeMessagesFactory;
protected SequentialFileFactory largeMessagesFactory;
protected Journal originalMessageJournal;
@ -818,7 +818,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
file.position(file.size());
if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) {
final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes());
file.writeDirect(nioBytes, false);
file.blockingWriteDirect(nioBytes, false, false);
if (isReplicated()) {
//copy defensively bytes
@ -843,8 +843,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
readLock();
try {
file.position(file.size());
file.writeDirect(ByteBuffer.wrap(bytes), false);
//that's an additional precaution to avoid ByteBuffer to be pooled:
//NIOSequentialFileFactory doesn't pool heap ByteBuffer, but better to make evident
//the intention by calling the right method
file.blockingWriteDirect(ByteBuffer.wrap(bytes), false, false);
if (isReplicated()) {
replicator.largeMessageWrite(messageId, bytes);

View File

@ -15,6 +15,7 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
@ -26,8 +27,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
@ -36,6 +40,7 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.junit.AfterClass;
@ -46,21 +51,22 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.is;
import static org.junit.Assume.assumeTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class JournalStorageManagerTest {
public class JournalStorageManagerTest extends ActiveMQTestBase {
@Parameterized.Parameter
public JournalType journalType;
@Parameterized.Parameters(name = "journal type={0}")
public static Collection<Object[]> getParams() {
final JournalType[] values = JournalType.values();
return Stream.of(JournalType.values())
.map(journalType -> new Object[]{journalType})
.collect(toList());
@ -89,14 +95,14 @@ public class JournalStorageManagerTest {
* Test of fixJournalFileSize method, of class JournalStorageManager.
*/
@Test
public void testFixJournalFileSize() {
public void testFixJournalFileSize() throws Exception {
if (journalType == JournalType.ASYNCIO) {
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
}
final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType);
final Configuration configuration = createDefaultInVMConfig().setJournalType(journalType);
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
final JournalStorageManager manager = new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory);
Assert.assertEquals(4096, manager.fixJournalFileSize(1024, 4096));
Assert.assertEquals(4096, manager.fixJournalFileSize(4098, 4096));
Assert.assertEquals(8192, manager.fixJournalFileSize(8192, 4096));
@ -107,7 +113,7 @@ public class JournalStorageManagerTest {
if (journalType == JournalType.ASYNCIO) {
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
}
final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType);
final Configuration configuration = createDefaultInVMConfig().setJournalType(journalType);
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
@ -164,4 +170,46 @@ public class JournalStorageManagerTest {
stoppedReplication.get();
}
@Test
public void testAddBytesToLargeMessageNotLeakingByteBuffer() throws Exception {
if (journalType == JournalType.ASYNCIO) {
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
}
final Configuration configuration = createDefaultInVMConfig().setJournalType(journalType);
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
final JournalStorageManager manager = new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory);
manager.largeMessagesFactory = spy(manager.largeMessagesFactory);
manager.start();
manager.loadBindingJournal(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
final PostOffice postOffice = mock(PostOffice.class);
final JournalLoader journalLoader = mock(JournalLoader.class);
manager.loadMessageJournal(postOffice, null, null, null, null, null, null, journalLoader);
final long id = manager.generateID() + 1;
final SequentialFile file = manager.createFileForLargeMessage(id, false);
try {
file.open();
doAnswer(invocation -> {
Assert.fail("No buffer should leak into the factory pool while writing into a large message");
return invocation.callRealMethod();
}).when(manager.largeMessagesFactory).releaseBuffer(any(ByteBuffer.class));
final int size = 100;
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size);
final ActiveMQBuffer directBuffer = ActiveMQBuffers.wrappedBuffer(byteBuffer);
directBuffer.writerIndex(size);
long fileSize = file.size();
manager.addBytesToLargeMessage(file, 1, directBuffer);
Assert.assertThat(file.size(), is(fileSize + size));
fileSize = file.size();
final ActiveMQBuffer heapBuffer = ActiveMQBuffers.wrappedBuffer(new byte[size]);
heapBuffer.writerIndex(size);
manager.addBytesToLargeMessage(file, 1, heapBuffer);
Assert.assertThat(file.size(), is(fileSize + size));
} finally {
manager.stop();
file.close();
file.delete();
}
}
}

View File

@ -117,6 +117,7 @@
<version.org.jacoco.plugin>0.7.9</version.org.jacoco.plugin>
<version.maven.jar.plugin>2.4</version.maven.jar.plugin>
<version.micrometer>1.1.4</version.micrometer>
<hamcrest.version>2.1</hamcrest.version>
<!-- used on tests -->
<groovy.version>2.4.3</groovy.version>