ARTEMIS-2210 PagingStore creation is not properly synchronized

In PagingManagerImpl#getPageStore() the operations on the map 'stores'
are not synchronzed and it's possible that more than one paging store is
created for one address.
This commit is contained in:
Howard Gao 2018-12-25 10:09:03 +08:00 committed by Clebert Suconic
parent 4179c44e7e
commit 70f8297cfa
1 changed files with 19 additions and 11 deletions

View File

@ -335,19 +335,30 @@ public final class PagingManagerImpl implements PagingManager {
}
/**
* stores is a ConcurrentHashMap, so we don't need to synchronize this method
* This method creates a new store if not exist.
*/
@Override
public PagingStore getPageStore(final SimpleString storeName) throws Exception {
if (managementAddress != null && storeName.startsWith(managementAddress)) {
return null;
}
PagingStore store = stores.get(storeName);
PagingStore store = stores.get(storeName);
if (store != null) {
return store;
}
return newStore(storeName);
//only if store is null we use computeIfAbsent
try {
return stores.computeIfAbsent(storeName, (s) -> {
try {
return newStore(s);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (RuntimeException e) {
throw (Exception) e.getCause();
}
}
@Override
@ -450,19 +461,16 @@ public final class PagingManagerImpl implements PagingManager {
}
}
//any caller that calls this method must guarantee the store doesn't exist.
private PagingStore newStore(final SimpleString address) throws Exception {
assert managementAddress == null || (managementAddress != null && !address.startsWith(managementAddress));
syncLock.readLock().lock();
try {
PagingStore store = stores.get(address);
if (store == null) {
store = pagingStoreFactory.newStore(address, addressSettingsRepository.getMatch(address.toString()));
PagingStore store = pagingStoreFactory.newStore(address, addressSettingsRepository.getMatch(address.toString()));
store.start();
if (!cleanupEnabled) {
store.disableCleanup();
}
stores.put(address, store);
}
return store;
} finally {
syncLock.readLock().unlock();