This closes #2696
This commit is contained in:
commit
c1771a5aed
|
@ -78,8 +78,8 @@
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.hamcrest</groupId>
|
<groupId>org.hamcrest</groupId>
|
||||||
<artifactId>hamcrest-core</artifactId>
|
<artifactId>hamcrest</artifactId>
|
||||||
<version>1.3</version>
|
<version>${hamcrest.version}</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -198,6 +198,12 @@
|
||||||
<artifactId>mockito-core</artifactId>
|
<artifactId>mockito-core</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.hamcrest</groupId>
|
||||||
|
<artifactId>hamcrest</artifactId>
|
||||||
|
<version>${hamcrest.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<profiles>
|
<profiles>
|
||||||
|
|
|
@ -75,7 +75,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
|
|
||||||
protected SequentialFileFactory bindingsFF;
|
protected SequentialFileFactory bindingsFF;
|
||||||
|
|
||||||
SequentialFileFactory largeMessagesFactory;
|
protected SequentialFileFactory largeMessagesFactory;
|
||||||
|
|
||||||
protected Journal originalMessageJournal;
|
protected Journal originalMessageJournal;
|
||||||
|
|
||||||
|
@ -818,7 +818,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
file.position(file.size());
|
file.position(file.size());
|
||||||
if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) {
|
if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) {
|
||||||
final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes());
|
final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes());
|
||||||
file.writeDirect(nioBytes, false);
|
file.blockingWriteDirect(nioBytes, false, false);
|
||||||
|
|
||||||
if (isReplicated()) {
|
if (isReplicated()) {
|
||||||
//copy defensively bytes
|
//copy defensively bytes
|
||||||
|
@ -843,8 +843,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
file.position(file.size());
|
file.position(file.size());
|
||||||
|
//that's an additional precaution to avoid ByteBuffer to be pooled:
|
||||||
file.writeDirect(ByteBuffer.wrap(bytes), false);
|
//NIOSequentialFileFactory doesn't pool heap ByteBuffer, but better to make evident
|
||||||
|
//the intention by calling the right method
|
||||||
|
file.blockingWriteDirect(ByteBuffer.wrap(bytes), false, false);
|
||||||
|
|
||||||
if (isReplicated()) {
|
if (isReplicated()) {
|
||||||
replicator.largeMessageWrite(messageId, bytes);
|
replicator.largeMessageWrite(messageId, bytes);
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.persistence.impl.journal;
|
package org.apache.activemq.artemis.core.persistence.impl.journal;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
@ -26,8 +27,11 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||||
|
@ -36,6 +40,7 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
|
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -46,21 +51,22 @@ import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
import static java.util.stream.Collectors.toList;
|
import static java.util.stream.Collectors.toList;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.junit.Assume.assumeTrue;
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class JournalStorageManagerTest {
|
public class JournalStorageManagerTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Parameterized.Parameter
|
@Parameterized.Parameter
|
||||||
public JournalType journalType;
|
public JournalType journalType;
|
||||||
|
|
||||||
@Parameterized.Parameters(name = "journal type={0}")
|
@Parameterized.Parameters(name = "journal type={0}")
|
||||||
public static Collection<Object[]> getParams() {
|
public static Collection<Object[]> getParams() {
|
||||||
final JournalType[] values = JournalType.values();
|
|
||||||
return Stream.of(JournalType.values())
|
return Stream.of(JournalType.values())
|
||||||
.map(journalType -> new Object[]{journalType})
|
.map(journalType -> new Object[]{journalType})
|
||||||
.collect(toList());
|
.collect(toList());
|
||||||
|
@ -89,14 +95,14 @@ public class JournalStorageManagerTest {
|
||||||
* Test of fixJournalFileSize method, of class JournalStorageManager.
|
* Test of fixJournalFileSize method, of class JournalStorageManager.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testFixJournalFileSize() {
|
public void testFixJournalFileSize() throws Exception {
|
||||||
if (journalType == JournalType.ASYNCIO) {
|
if (journalType == JournalType.ASYNCIO) {
|
||||||
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
|
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
|
||||||
}
|
}
|
||||||
final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType);
|
final Configuration configuration = createDefaultInVMConfig().setJournalType(journalType);
|
||||||
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
|
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
|
||||||
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
|
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
|
||||||
final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
|
final JournalStorageManager manager = new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory);
|
||||||
Assert.assertEquals(4096, manager.fixJournalFileSize(1024, 4096));
|
Assert.assertEquals(4096, manager.fixJournalFileSize(1024, 4096));
|
||||||
Assert.assertEquals(4096, manager.fixJournalFileSize(4098, 4096));
|
Assert.assertEquals(4096, manager.fixJournalFileSize(4098, 4096));
|
||||||
Assert.assertEquals(8192, manager.fixJournalFileSize(8192, 4096));
|
Assert.assertEquals(8192, manager.fixJournalFileSize(8192, 4096));
|
||||||
|
@ -107,7 +113,7 @@ public class JournalStorageManagerTest {
|
||||||
if (journalType == JournalType.ASYNCIO) {
|
if (journalType == JournalType.ASYNCIO) {
|
||||||
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
|
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
|
||||||
}
|
}
|
||||||
final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType);
|
final Configuration configuration = createDefaultInVMConfig().setJournalType(journalType);
|
||||||
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
|
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
|
||||||
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
|
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
|
||||||
final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
|
final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
|
||||||
|
@ -164,4 +170,46 @@ public class JournalStorageManagerTest {
|
||||||
stoppedReplication.get();
|
stoppedReplication.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddBytesToLargeMessageNotLeakingByteBuffer() throws Exception {
|
||||||
|
if (journalType == JournalType.ASYNCIO) {
|
||||||
|
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
|
||||||
|
}
|
||||||
|
final Configuration configuration = createDefaultInVMConfig().setJournalType(journalType);
|
||||||
|
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
|
||||||
|
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
|
||||||
|
final JournalStorageManager manager = new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory);
|
||||||
|
manager.largeMessagesFactory = spy(manager.largeMessagesFactory);
|
||||||
|
manager.start();
|
||||||
|
manager.loadBindingJournal(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
|
||||||
|
final PostOffice postOffice = mock(PostOffice.class);
|
||||||
|
final JournalLoader journalLoader = mock(JournalLoader.class);
|
||||||
|
manager.loadMessageJournal(postOffice, null, null, null, null, null, null, journalLoader);
|
||||||
|
final long id = manager.generateID() + 1;
|
||||||
|
final SequentialFile file = manager.createFileForLargeMessage(id, false);
|
||||||
|
try {
|
||||||
|
file.open();
|
||||||
|
doAnswer(invocation -> {
|
||||||
|
Assert.fail("No buffer should leak into the factory pool while writing into a large message");
|
||||||
|
return invocation.callRealMethod();
|
||||||
|
}).when(manager.largeMessagesFactory).releaseBuffer(any(ByteBuffer.class));
|
||||||
|
final int size = 100;
|
||||||
|
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size);
|
||||||
|
final ActiveMQBuffer directBuffer = ActiveMQBuffers.wrappedBuffer(byteBuffer);
|
||||||
|
directBuffer.writerIndex(size);
|
||||||
|
long fileSize = file.size();
|
||||||
|
manager.addBytesToLargeMessage(file, 1, directBuffer);
|
||||||
|
Assert.assertThat(file.size(), is(fileSize + size));
|
||||||
|
fileSize = file.size();
|
||||||
|
final ActiveMQBuffer heapBuffer = ActiveMQBuffers.wrappedBuffer(new byte[size]);
|
||||||
|
heapBuffer.writerIndex(size);
|
||||||
|
manager.addBytesToLargeMessage(file, 1, heapBuffer);
|
||||||
|
Assert.assertThat(file.size(), is(fileSize + size));
|
||||||
|
} finally {
|
||||||
|
manager.stop();
|
||||||
|
file.close();
|
||||||
|
file.delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
1
pom.xml
1
pom.xml
|
@ -117,6 +117,7 @@
|
||||||
<version.org.jacoco.plugin>0.7.9</version.org.jacoco.plugin>
|
<version.org.jacoco.plugin>0.7.9</version.org.jacoco.plugin>
|
||||||
<version.maven.jar.plugin>2.4</version.maven.jar.plugin>
|
<version.maven.jar.plugin>2.4</version.maven.jar.plugin>
|
||||||
<version.micrometer>1.1.4</version.micrometer>
|
<version.micrometer>1.1.4</version.micrometer>
|
||||||
|
<hamcrest.version>2.1</hamcrest.version>
|
||||||
|
|
||||||
<!-- used on tests -->
|
<!-- used on tests -->
|
||||||
<groovy.version>2.4.3</groovy.version>
|
<groovy.version>2.4.3</groovy.version>
|
||||||
|
|
Loading…
Reference in New Issue