ARTEMIS-2216 Use a specific executor for pageSyncTimer

This commit is contained in:
Qihong Xu 2019-01-04 18:09:24 +08:00 committed by Clebert Suconic
parent 5ae3dba072
commit e6fe9f9d92
2 changed files with 28 additions and 18 deletions

View File

@ -152,7 +152,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
@Override @Override
public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) { public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional); return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), executorFactory.getExecutor(), syncNonTransactional);
} }
@Override @Override
@ -223,7 +223,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
AddressSettings settings = addressSettingsRepository.getMatch(address.toString()); AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), syncNonTransactional); PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), executorFactory.getExecutor(), syncNonTransactional);
storesReturn.add(store); storesReturn.add(store);
} }

View File

@ -140,6 +140,21 @@ public class PagingStoreImpl implements PagingStore {
final AddressSettings addressSettings, final AddressSettings addressSettings,
final ArtemisExecutor executor, final ArtemisExecutor executor,
final boolean syncNonTransactional) { final boolean syncNonTransactional) {
this(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, executor, syncNonTransactional);
}
public PagingStoreImpl(final SimpleString address,
final ScheduledExecutorService scheduledExecutor,
final long syncTimeout,
final PagingManager pagingManager,
final StorageManager storageManager,
final SequentialFileFactory fileFactory,
final PagingStoreFactory storeFactory,
final SimpleString storeName,
final AddressSettings addressSettings,
final ArtemisExecutor executor,
final ArtemisExecutor ioExecutor,
final boolean syncNonTransactional) {
if (pagingManager == null) { if (pagingManager == null) {
throw new IllegalStateException("Paging Manager can't be null"); throw new IllegalStateException("Paging Manager can't be null");
} }
@ -172,7 +187,7 @@ public class PagingStoreImpl implements PagingStore {
this.syncNonTransactional = syncNonTransactional; this.syncNonTransactional = syncNonTransactional;
if (scheduledExecutor != null && syncTimeout > 0) { if (scheduledExecutor != null && syncTimeout > 0) {
this.syncTimer = new PageSyncTimer(this, scheduledExecutor, executor, syncTimeout); this.syncTimer = new PageSyncTimer(this, scheduledExecutor, ioExecutor, syncTimeout);
} else { } else {
this.syncTimer = null; this.syncTimer = null;
} }
@ -275,22 +290,17 @@ public class PagingStoreImpl implements PagingStore {
@Override @Override
public boolean isPaging() { public boolean isPaging() {
lock.readLock().lock(); AddressFullMessagePolicy policy = this.addressFullMessagePolicy;
if (policy == AddressFullMessagePolicy.BLOCK) {
try {
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
return false; return false;
} }
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { if (policy == AddressFullMessagePolicy.FAIL) {
return isFull(); return isFull();
} }
if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) { if (policy == AddressFullMessagePolicy.DROP) {
return isFull(); return isFull();
} }
return paging; return paging;
} finally {
lock.readLock().unlock();
}
} }
@Override @Override