This commit is contained in:
Martyn Taylor 2018-07-25 15:03:57 +01:00
commit 845345313a
8 changed files with 193 additions and 15 deletions

View File

@ -37,7 +37,7 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.Env;
public final class NIOSequentialFile extends AbstractSequentialFile { public class NIOSequentialFile extends AbstractSequentialFile {
private FileChannel channel; private FileChannel channel;

View File

@ -29,7 +29,7 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
public final class NIOSequentialFileFactory extends AbstractSequentialFileFactory { public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
private static final int DEFAULT_CAPACITY_ALIGNMENT = Env.osPageSize(); private static final int DEFAULT_CAPACITY_ALIGNMENT = Env.osPageSize();

View File

@ -62,7 +62,7 @@ public final class PagingManagerImpl implements PagingManager {
private final HierarchicalRepository<AddressSettings> addressSettingsRepository; private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
private final PagingStoreFactory pagingStoreFactory; private PagingStoreFactory pagingStoreFactory;
private final AtomicLong globalSizeBytes = new AtomicLong(0); private final AtomicLong globalSizeBytes = new AtomicLong(0);
@ -84,6 +84,17 @@ public final class PagingManagerImpl implements PagingManager {
// Constructors // Constructors
// -------------------------------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------------------------------
// for tests.. not part of the API
public void replacePageStoreFactory(PagingStoreFactory factory) {
this.pagingStoreFactory = factory;
}
// for tests.. not part of the API
public PagingStoreFactory getPagingStoreFactory() {
return pagingStoreFactory;
}
public PagingManagerImpl(final PagingStoreFactory pagingSPI, public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository, final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final long maxSize) { final long maxSize) {

View File

@ -63,7 +63,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
private final ExecutorFactory executorFactory; private final ExecutorFactory executorFactory;
protected final boolean syncNonTransactional; private final boolean syncNonTransactional;
private PagingManager pagingManager; private PagingManager pagingManager;
@ -71,10 +71,38 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
private final long syncTimeout; private final long syncTimeout;
protected final StorageManager storageManager; private final StorageManager storageManager;
private final IOCriticalErrorListener critialErrorListener; private final IOCriticalErrorListener critialErrorListener;
public File getDirectory() {
return directory;
}
public ExecutorFactory getExecutorFactory() {
return executorFactory;
}
public boolean isSyncNonTransactional() {
return syncNonTransactional;
}
public PagingManager getPagingManager() {
return pagingManager;
}
public long getSyncTimeout() {
return syncTimeout;
}
public StorageManager getStorageManager() {
return storageManager;
}
public IOCriticalErrorListener getCritialErrorListener() {
return critialErrorListener;
}
public PagingStoreFactoryNIO(final StorageManager storageManager, public PagingStoreFactoryNIO(final StorageManager storageManager,
final File directory, final File directory,
final long syncTimeout, final long syncTimeout,
@ -135,9 +163,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
factory.createDirs(); factory.createDirs();
File fileWithID = new File(directory, guid + File fileWithID = new File(directory, guid + File.separatorChar + PagingStoreFactoryNIO.ADDRESS_FILE);
File.separatorChar +
PagingStoreFactoryNIO.ADDRESS_FILE);
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(fileWithID)))) { try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(fileWithID)))) {
writer.write(address.toString()); writer.write(address.toString());
@ -197,7 +223,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
} }
} }
private SequentialFileFactory newFileFactory(final String directoryName) { protected SequentialFileFactory newFileFactory(final String directoryName) {
return new NIOSequentialFileFactory(new File(directory, directoryName), false, critialErrorListener, 1); return new NIOSequentialFileFactory(new File(directory, directoryName), false, critialErrorListener, 1);
} }

View File

@ -43,8 +43,6 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent; import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageInSync; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageInSync;
@ -262,7 +260,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly(JournalState.SYNCING); journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly(JournalState.SYNCING);
} }
pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), config.getJournalBufferTimeout_NIO(), server.getScheduledPool(), server.getIOExecutorFactory(), config.isJournalSyncNonTransactional(), criticalErrorListener), server.getAddressSettingsRepository());
pageManager = server.createPagingManager();
pageManager.start(); pageManager.start();
@ -446,6 +445,10 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
} }
if (data == null) { if (data == null) {
// this means close file
if (channel1.isOpen()) {
channel1.close();
}
return; return;
} }

View File

@ -128,6 +128,8 @@ public interface ActiveMQServer extends ServiceComponent {
PagingManager getPagingManager(); PagingManager getPagingManager();
PagingManager createPagingManager() throws Exception;
ManagementService getManagementService(); ManagementService getManagementService();
ActiveMQSecurityManager getSecurityManager(); ActiveMQSecurityManager getSecurityManager();

View File

@ -2132,7 +2132,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.queueFactory = factory; this.queueFactory = factory;
} }
protected PagingManager createPagingManager() throws Exception { @Override
public PagingManager createPagingManager() throws Exception {
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize()); return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize());
} }

View File

@ -18,17 +18,22 @@
package org.apache.activemq.artemis.tests.integration.replication; package org.apache.activemq.artemis.tests.integration.replication;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
@ -40,20 +45,34 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFile;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.junit.Wait; import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -66,7 +85,6 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
ExecutorService sendMessageExecutor; ExecutorService sendMessageExecutor;
@Before @Before
public void setupExecutor() { public void setupExecutor() {
sendMessageExecutor = Executors.newCachedThreadPool(); sendMessageExecutor = Executors.newCachedThreadPool();
@ -100,7 +118,6 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
sess.createQueue("flowcontrol", RoutingType.ANYCAST, "flowcontrol", true); sess.createQueue("flowcontrol", RoutingType.ANYCAST, "flowcontrol", true);
sess.close(); sess.close();
int i = 0; int i = 0;
final int j = 100; final int j = 100;
final CountDownLatch allMessageSent = new CountDownLatch(j); final CountDownLatch allMessageSent = new CountDownLatch(j);
@ -194,6 +211,124 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
Assert.assertEquals("Backup did not replicated all journal", j, replicationCounter.get()); Assert.assertEquals("Backup did not replicated all journal", j, replicationCounter.get());
} }
@Test
public void testSendPages() throws Exception {
// start live
Configuration liveConfiguration = createLiveConfiguration();
ActiveMQServer liveServer = addServer(ActiveMQServers.newActiveMQServer(liveConfiguration));
liveServer.start();
Wait.waitFor(() -> liveServer.isStarted());
ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
locator.setCallTimeout(60_000L);
locator.setConnectionTTL(60_000L);
final ClientSessionFactory csf = locator.createSessionFactory();
ClientSession sess = csf.createSession();
sess.createQueue("flowcontrol", RoutingType.ANYCAST, "flowcontrol", true);
PagingStore store = liveServer.getPagingManager().getPageStore(SimpleString.toSimpleString("flowcontrol"));
store.startPaging();
ClientProducer prod = sess.createProducer("flowcontrol");
for (int i = 0; i < 100; i++) {
prod.send(sess.createMessage(true));
if (i % 10 == 0) {
sess.commit();
store.forceAnotherPage();
}
}
sess.close();
openCount.set(0);
closeCount.set(0);
// start backup
Configuration backupConfiguration = createBackupConfiguration().setNetworkCheckURLList(null);
ActiveMQServer backupServer = new ActiveMQServerImpl(backupConfiguration, ManagementFactory.getPlatformMBeanServer(), new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration())) {
@Override
public PagingManager createPagingManager() throws Exception {
PagingManagerImpl manager = (PagingManagerImpl) super.createPagingManager();
PagingStoreFactoryNIO originalPageStore = (PagingStoreFactoryNIO) manager.getPagingStoreFactory();
manager.replacePageStoreFactory(new PageStoreFactoryTestable(originalPageStore));
return manager;
}
};
addServer(backupServer).start();
Wait.waitFor(() -> backupServer.isStarted());
Wait.waitFor(backupServer::isReplicaSync, 30000);
PageStoreFactoryTestable testablePageStoreFactory = (PageStoreFactoryTestable) ((PagingManagerImpl) backupServer.getPagingManager()).getPagingStoreFactory();
Assert.assertEquals(openCount.get(), closeCount.get());
}
static AtomicInteger openCount = new AtomicInteger(0);
static AtomicInteger closeCount = new AtomicInteger(0);
private static class PageStoreFactoryTestable extends PagingStoreFactoryNIO {
PageStoreFactoryTestable(StorageManager storageManager,
File directory,
long syncTimeout,
ScheduledExecutorService scheduledExecutor,
ExecutorFactory executorFactory,
boolean syncNonTransactional,
IOCriticalErrorListener critialErrorListener) {
super(storageManager, directory, syncTimeout, scheduledExecutor, executorFactory, syncNonTransactional, critialErrorListener);
}
PageStoreFactoryTestable(PagingStoreFactoryNIO other) {
this(other.getStorageManager(), other.getDirectory(), other.getSyncTimeout(), other.getScheduledExecutor(), other.getExecutorFactory(), other.isSyncNonTransactional(), other.getCritialErrorListener());
}
@Override
protected SequentialFileFactory newFileFactory(String directoryName) {
return new TestableNIOFactory(new File(getDirectory(), directoryName), false, getCritialErrorListener(), 1);
}
}
public static class TestableNIOFactory extends NIOSequentialFileFactory {
public TestableNIOFactory(File journalDir, boolean buffered, IOCriticalErrorListener listener, int maxIO) {
super(journalDir, buffered, listener, maxIO);
}
@Override
public SequentialFile createSequentialFile(String fileName) {
return new TestableSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
}
}
public static class TestableSequentialFile extends NIOSequentialFile {
public TestableSequentialFile(SequentialFileFactory factory,
File directory,
String file,
int maxIO,
Executor writerExecutor) {
super(factory, directory, file, maxIO, writerExecutor);
}
@Override
public void open(int maxIO, boolean useExecutor) throws IOException {
super.open(maxIO, useExecutor);
openCount.incrementAndGet();
}
@Override
public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
super.close();
closeCount.incrementAndGet();
}
}
// Set a small call timeout and write buffer high water mark value to trigger replication flow control // Set a small call timeout and write buffer high water mark value to trigger replication flow control
private Configuration createLiveConfiguration() throws Exception { private Configuration createLiveConfiguration() throws Exception {
Configuration conf = new ConfigurationImpl(); Configuration conf = new ConfigurationImpl();