diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index 891bd5cce2..55654b7d5e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -37,7 +37,7 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.Env; -public final class NIOSequentialFile extends AbstractSequentialFile { +public class NIOSequentialFile extends AbstractSequentialFile { private FileChannel channel; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java index b585b24667..c14237771a 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java @@ -29,7 +29,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; -public final class NIOSequentialFileFactory extends AbstractSequentialFileFactory { +public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { private static final int DEFAULT_CAPACITY_ALIGNMENT = Env.osPageSize(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 44e806742f..bca70cf170 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -62,7 +62,7 @@ public final class PagingManagerImpl implements PagingManager { private final HierarchicalRepository addressSettingsRepository; - private final PagingStoreFactory pagingStoreFactory; + private PagingStoreFactory pagingStoreFactory; private final AtomicLong globalSizeBytes = new AtomicLong(0); @@ -84,6 +84,17 @@ public final class PagingManagerImpl implements PagingManager { // Constructors // -------------------------------------------------------------------------------------------------------------------- + + // for tests.. not part of the API + public void replacePageStoreFactory(PagingStoreFactory factory) { + this.pagingStoreFactory = factory; + } + + // for tests.. not part of the API + public PagingStoreFactory getPagingStoreFactory() { + return pagingStoreFactory; + } + public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository addressSettingsRepository, final long maxSize) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java index b2e3d4f923..aa71c0ecc8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java @@ -63,7 +63,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { private final ExecutorFactory executorFactory; - protected final boolean syncNonTransactional; + private final boolean syncNonTransactional; private PagingManager pagingManager; @@ -71,10 +71,38 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { private final long syncTimeout; - protected final StorageManager storageManager; + private final StorageManager storageManager; private final IOCriticalErrorListener critialErrorListener; + public File getDirectory() { + return directory; + } + + public ExecutorFactory getExecutorFactory() { + return executorFactory; + } + + public boolean isSyncNonTransactional() { + return syncNonTransactional; + } + + public PagingManager getPagingManager() { + return pagingManager; + } + + public long getSyncTimeout() { + return syncTimeout; + } + + public StorageManager getStorageManager() { + return storageManager; + } + + public IOCriticalErrorListener getCritialErrorListener() { + return critialErrorListener; + } + public PagingStoreFactoryNIO(final StorageManager storageManager, final File directory, final long syncTimeout, @@ -135,9 +163,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { factory.createDirs(); - File fileWithID = new File(directory, guid + - File.separatorChar + - PagingStoreFactoryNIO.ADDRESS_FILE); + File fileWithID = new File(directory, guid + File.separatorChar + PagingStoreFactoryNIO.ADDRESS_FILE); try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(fileWithID)))) { writer.write(address.toString()); @@ -197,7 +223,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { } } - private SequentialFileFactory newFileFactory(final String directoryName) { + protected SequentialFileFactory newFileFactory(final String directoryName) { return new NIOSequentialFileFactory(new File(directory, directoryName), false, critialErrorListener, 1); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 6e45a8c1af..15d53116e1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -43,8 +43,6 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.impl.Page; -import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; -import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageInSync; @@ -262,7 +260,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly(JournalState.SYNCING); } - pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), config.getJournalBufferTimeout_NIO(), server.getScheduledPool(), server.getIOExecutorFactory(), config.isJournalSyncNonTransactional(), criticalErrorListener), server.getAddressSettingsRepository()); + + pageManager = server.createPagingManager(); pageManager.start(); @@ -446,6 +445,10 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon } if (data == null) { + // this means close file + if (channel1.isOpen()) { + channel1.close(); + } return; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index c2e4cbf2b1..cfc5bb96e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -128,6 +128,8 @@ public interface ActiveMQServer extends ServiceComponent { PagingManager getPagingManager(); + PagingManager createPagingManager() throws Exception; + ManagementService getManagementService(); ActiveMQSecurityManager getSecurityManager(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index fb7fbccb6a..4abda08fcf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2132,7 +2132,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { this.queueFactory = factory; } - protected PagingManager createPagingManager() throws Exception { + @Override + public PagingManager createPagingManager() throws Exception { return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize()); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java index 923ce3a8fa..8fca5774b9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java @@ -18,17 +18,22 @@ package org.apache.activemq.artemis.tests.integration.replication; import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; @@ -40,20 +45,34 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFile; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; +import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; +import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.junit.Wait; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; +import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.ExecutorFactory; import org.jboss.logging.Logger; import org.junit.After; import org.junit.Assert; @@ -66,7 +85,6 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase { ExecutorService sendMessageExecutor; - @Before public void setupExecutor() { sendMessageExecutor = Executors.newCachedThreadPool(); @@ -100,7 +118,6 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase { sess.createQueue("flowcontrol", RoutingType.ANYCAST, "flowcontrol", true); sess.close(); - int i = 0; final int j = 100; final CountDownLatch allMessageSent = new CountDownLatch(j); @@ -194,6 +211,124 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase { Assert.assertEquals("Backup did not replicated all journal", j, replicationCounter.get()); } + @Test + public void testSendPages() throws Exception { + // start live + Configuration liveConfiguration = createLiveConfiguration(); + ActiveMQServer liveServer = addServer(ActiveMQServers.newActiveMQServer(liveConfiguration)); + liveServer.start(); + + Wait.waitFor(() -> liveServer.isStarted()); + + ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616"); + locator.setCallTimeout(60_000L); + locator.setConnectionTTL(60_000L); + + final ClientSessionFactory csf = locator.createSessionFactory(); + ClientSession sess = csf.createSession(); + sess.createQueue("flowcontrol", RoutingType.ANYCAST, "flowcontrol", true); + + PagingStore store = liveServer.getPagingManager().getPageStore(SimpleString.toSimpleString("flowcontrol")); + store.startPaging(); + + ClientProducer prod = sess.createProducer("flowcontrol"); + for (int i = 0; i < 100; i++) { + prod.send(sess.createMessage(true)); + + if (i % 10 == 0) { + sess.commit(); + store.forceAnotherPage(); + } + } + + sess.close(); + + openCount.set(0); + closeCount.set(0); + // start backup + Configuration backupConfiguration = createBackupConfiguration().setNetworkCheckURLList(null); + + ActiveMQServer backupServer = new ActiveMQServerImpl(backupConfiguration, ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration())) { + @Override + public PagingManager createPagingManager() throws Exception { + PagingManagerImpl manager = (PagingManagerImpl) super.createPagingManager(); + PagingStoreFactoryNIO originalPageStore = (PagingStoreFactoryNIO) manager.getPagingStoreFactory(); + manager.replacePageStoreFactory(new PageStoreFactoryTestable(originalPageStore)); + return manager; + } + }; + + addServer(backupServer).start(); + + Wait.waitFor(() -> backupServer.isStarted()); + + Wait.waitFor(backupServer::isReplicaSync, 30000); + + PageStoreFactoryTestable testablePageStoreFactory = (PageStoreFactoryTestable) ((PagingManagerImpl) backupServer.getPagingManager()).getPagingStoreFactory(); + + Assert.assertEquals(openCount.get(), closeCount.get()); + } + + static AtomicInteger openCount = new AtomicInteger(0); + static AtomicInteger closeCount = new AtomicInteger(0); + + private static class PageStoreFactoryTestable extends PagingStoreFactoryNIO { + + PageStoreFactoryTestable(StorageManager storageManager, + File directory, + long syncTimeout, + ScheduledExecutorService scheduledExecutor, + ExecutorFactory executorFactory, + boolean syncNonTransactional, + IOCriticalErrorListener critialErrorListener) { + super(storageManager, directory, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener); + } + + PageStoreFactoryTestable(PagingStoreFactoryNIO other) { + this(other.getStorageManager(), other.getDirectory(), other.getSyncTimeout(), other.getScheduledExecutor(), other.getExecutorFactory(), other.isSyncNonTransactional(), other.getCritialErrorListener()); + } + + @Override + protected SequentialFileFactory newFileFactory(String directoryName) { + return new TestableNIOFactory(new File(getDirectory(), directoryName), false, getCritialErrorListener(), 1); + } + } + + public static class TestableNIOFactory extends NIOSequentialFileFactory { + + public TestableNIOFactory(File journalDir, boolean buffered, IOCriticalErrorListener listener, int maxIO) { + super(journalDir, buffered, listener, maxIO); + } + + @Override + public SequentialFile createSequentialFile(String fileName) { + return new TestableSequentialFile(this, journalDir, fileName, maxIO, writeExecutor); + } + } + + public static class TestableSequentialFile extends NIOSequentialFile { + + public TestableSequentialFile(SequentialFileFactory factory, + File directory, + String file, + int maxIO, + Executor writerExecutor) { + super(factory, directory, file, maxIO, writerExecutor); + } + + @Override + public void open(int maxIO, boolean useExecutor) throws IOException { + super.open(maxIO, useExecutor); + openCount.incrementAndGet(); + } + + @Override + public synchronized void close() throws IOException, InterruptedException, ActiveMQException { + super.close(); + closeCount.incrementAndGet(); + } + } + // Set a small call timeout and write buffer high water mark value to trigger replication flow control private Configuration createLiveConfiguration() throws Exception { Configuration conf = new ConfigurationImpl();