This closes #2484
This commit is contained in:
commit
4e55c6418c
|
@ -69,6 +69,11 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
|
|||
|
||||
void applySetting(AddressSettings addressSettings);
|
||||
|
||||
/** This method will look if the current state of paging is not paging,
|
||||
* without using a lock.
|
||||
* For cases where you need absolutely atomic results, check it directly on the internal variables while requiring a readLock.
|
||||
*
|
||||
* It's ok to look for this with an estimate on starting a task or not, but you will need to recheck on actual paging operations. */
|
||||
boolean isPaging();
|
||||
|
||||
/**
|
||||
|
|
|
@ -152,7 +152,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
|||
@Override
|
||||
public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
|
||||
|
||||
return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
|
||||
return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), executorFactory.getExecutor(), syncNonTransactional);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -223,7 +223,7 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
|
|||
|
||||
AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
|
||||
|
||||
PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
|
||||
PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), executorFactory.getExecutor(), syncNonTransactional);
|
||||
|
||||
storesReturn.add(store);
|
||||
}
|
||||
|
|
|
@ -140,6 +140,21 @@ public class PagingStoreImpl implements PagingStore {
|
|||
final AddressSettings addressSettings,
|
||||
final ArtemisExecutor executor,
|
||||
final boolean syncNonTransactional) {
|
||||
this(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, executor, syncNonTransactional);
|
||||
}
|
||||
|
||||
public PagingStoreImpl(final SimpleString address,
|
||||
final ScheduledExecutorService scheduledExecutor,
|
||||
final long syncTimeout,
|
||||
final PagingManager pagingManager,
|
||||
final StorageManager storageManager,
|
||||
final SequentialFileFactory fileFactory,
|
||||
final PagingStoreFactory storeFactory,
|
||||
final SimpleString storeName,
|
||||
final AddressSettings addressSettings,
|
||||
final ArtemisExecutor executor,
|
||||
final ArtemisExecutor ioExecutor,
|
||||
final boolean syncNonTransactional) {
|
||||
if (pagingManager == null) {
|
||||
throw new IllegalStateException("Paging Manager can't be null");
|
||||
}
|
||||
|
@ -172,7 +187,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
this.syncNonTransactional = syncNonTransactional;
|
||||
|
||||
if (scheduledExecutor != null && syncTimeout > 0) {
|
||||
this.syncTimer = new PageSyncTimer(this, scheduledExecutor, executor, syncTimeout);
|
||||
this.syncTimer = new PageSyncTimer(this, scheduledExecutor, ioExecutor, syncTimeout);
|
||||
} else {
|
||||
this.syncTimer = null;
|
||||
}
|
||||
|
@ -275,22 +290,17 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
@Override
|
||||
public boolean isPaging() {
|
||||
lock.readLock().lock();
|
||||
|
||||
try {
|
||||
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
|
||||
return false;
|
||||
}
|
||||
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
|
||||
return isFull();
|
||||
}
|
||||
if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
|
||||
return isFull();
|
||||
}
|
||||
return paging;
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
AddressFullMessagePolicy policy = this.addressFullMessagePolicy;
|
||||
if (policy == AddressFullMessagePolicy.BLOCK) {
|
||||
return false;
|
||||
}
|
||||
if (policy == AddressFullMessagePolicy.FAIL) {
|
||||
return isFull();
|
||||
}
|
||||
if (policy == AddressFullMessagePolicy.DROP) {
|
||||
return isFull();
|
||||
}
|
||||
return paging;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -483,12 +493,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
// I'm not calling isPaging() here because
|
||||
// isPaging will perform extra steps.
|
||||
// at this context it doesn't really matter what policy we are using
|
||||
// since this method is only called when paging.
|
||||
// Besides that isPaging() will perform lock.readLock() again which is not needed here
|
||||
// for that reason the attribute is used directly here.
|
||||
// I'm not calling isPaging() here because i need to be atomic and hold a lock.
|
||||
if (paging) {
|
||||
return false;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue