ARTEMIS-1700 Using IOExecutors for more IO tasks

This commit is contained in:
Clebert Suconic 2018-02-27 11:36:51 -05:00
parent f6c5408b0e
commit 7e06a2b192
5 changed files with 13 additions and 11 deletions

View File

@ -533,7 +533,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
cursor.confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1));
}
while (!storageManager.waitOnOperations(5000)) {
// we just need to make sure the storage is done..
// if the thread pool is full, we will just log it once instead of looping
if (!storageManager.waitOnOperations(5000)) {
ActiveMQServerLogger.LOGGER.problemCompletingOperations(storageManager.getContext());
}
} finally {

View File

@ -155,7 +155,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
protected BatchingIDGenerator idGenerator;
protected final ExecutorFactory ioExecutors;
protected final ExecutorFactory ioExecutorFactory;
protected final ScheduledExecutorService scheduledExecutorService;
@ -197,15 +197,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
final CriticalAnalyzer analyzer,
final ExecutorFactory executorFactory,
final ScheduledExecutorService scheduledExecutorService,
final ExecutorFactory ioExecutors) {
this(config, analyzer, executorFactory, scheduledExecutorService, ioExecutors, null);
final ExecutorFactory ioExecutorFactory) {
this(config, analyzer, executorFactory, scheduledExecutorService, ioExecutorFactory, null);
}
public AbstractJournalStorageManager(Configuration config,
CriticalAnalyzer analyzer,
ExecutorFactory executorFactory,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory ioExecutors,
ExecutorFactory ioExecutorFactory,
IOCriticalErrorListener criticalErrorListener) {
super(analyzer, CRITICAL_PATHS);
@ -213,7 +213,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
this.ioCriticalErrorListener = criticalErrorListener;
this.ioExecutors = ioExecutors;
this.ioExecutorFactory = ioExecutorFactory;
this.scheduledExecutorService = scheduledExecutorService;
@ -1519,7 +1519,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
beforeStart();
singleThreadExecutor = executorFactory.getExecutor();
singleThreadExecutor = ioExecutorFactory.getExecutor();
bindingsJournal.start();

View File

@ -129,7 +129,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
bindingsFF.setDatasync(config.isJournalDatasync());
Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener);
Journal localBindings = new JournalImpl(ioExecutorFactory, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener);
bindingsJournal = localBindings;
originalBindingsJournal = localBindings;
@ -184,7 +184,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
protected Journal createMessageJournal(Configuration config,
IOCriticalErrorListener criticalErrorListener,
int fileSize) {
return new JournalImpl(ioExecutors, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener);
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener);
}
// Life Cycle Handlers

View File

@ -262,7 +262,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly(JournalState.SYNCING);
}
pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), config.getJournalBufferTimeout_NIO(), server.getScheduledPool(), server.getExecutorFactory(), config.isJournalSyncNonTransactional(), criticalErrorListener), server.getAddressSettingsRepository());
pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), config.getJournalBufferTimeout_NIO(), server.getScheduledPool(), server.getIOExecutorFactory(), config.isJournalSyncNonTransactional(), criticalErrorListener), server.getAddressSettingsRepository());
pageManager.start();

View File

@ -99,7 +99,7 @@ public class ShutdownOnCriticalIOErrorMoveNextTest extends ActiveMQTestBase {
protected Journal createMessageJournal(Configuration config,
IOCriticalErrorListener criticalErrorListener,
int fileSize) {
return new JournalImpl(ioExecutors, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener) {
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener) {
@Override
protected void moveNextFile(boolean scheduleReclaim) throws Exception {
super.moveNextFile(scheduleReclaim);