ARTEMIS-2321 Removed unnecessary volatile/Atomic operations and fields
This commit is contained in:
parent
82898a8a3c
commit
b173bb5552
|
@ -28,8 +28,6 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
@ -78,7 +76,8 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
private final DecimalFormat format = new DecimalFormat("000000000");
|
||||
|
||||
private final AtomicInteger currentPageSize = new AtomicInteger(0);
|
||||
//it's being guarded by lock.writeLock().lock() and never read concurrently
|
||||
private int currentPageSize = 0;
|
||||
|
||||
private final SimpleString storeName;
|
||||
|
||||
|
@ -125,7 +124,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
private final boolean syncNonTransactional;
|
||||
|
||||
private volatile AtomicBoolean blocking = new AtomicBoolean(false);
|
||||
private volatile boolean blocking = false;
|
||||
|
||||
private long rejectThreshold;
|
||||
|
||||
|
@ -280,7 +279,7 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
@Override
|
||||
public File getFolder() {
|
||||
SequentialFileFactory factoryUsed = this.fileFactory;
|
||||
final SequentialFileFactory factoryUsed = this.fileFactory;
|
||||
if (factoryUsed != null) {
|
||||
return factoryUsed.getDirectory();
|
||||
} else {
|
||||
|
@ -333,8 +332,9 @@ public class PagingStoreImpl implements PagingStore {
|
|||
lock.readLock().lock();
|
||||
|
||||
try {
|
||||
if (currentPage != null) {
|
||||
currentPage.sync();
|
||||
final Page page = currentPage;
|
||||
if (page != null) {
|
||||
page.sync();
|
||||
}
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
|
@ -377,8 +377,9 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
running = false;
|
||||
|
||||
if (currentPage != null) {
|
||||
currentPage.close(false);
|
||||
final Page page = currentPage;
|
||||
if (page != null) {
|
||||
page.close(false);
|
||||
currentPage = null;
|
||||
}
|
||||
}
|
||||
|
@ -415,11 +416,14 @@ public class PagingStoreImpl implements PagingStore {
|
|||
firstPageId = Integer.MAX_VALUE;
|
||||
|
||||
// There are no files yet on this Storage. We will just return it empty
|
||||
final SequentialFileFactory fileFactory = this.fileFactory;
|
||||
if (fileFactory != null) {
|
||||
|
||||
currentPageId = 0;
|
||||
if (currentPage != null) {
|
||||
currentPage.close(false);
|
||||
int pageId = 0;
|
||||
currentPageId = pageId;
|
||||
final Page oldPage = currentPage;
|
||||
if (oldPage != null) {
|
||||
oldPage.close(false);
|
||||
}
|
||||
currentPage = null;
|
||||
|
||||
|
@ -430,8 +434,8 @@ public class PagingStoreImpl implements PagingStore {
|
|||
for (String fileName : files) {
|
||||
final int fileId = PagingStoreImpl.getPageIdFromFileName(fileName);
|
||||
|
||||
if (fileId > currentPageId) {
|
||||
currentPageId = fileId;
|
||||
if (fileId > pageId) {
|
||||
pageId = fileId;
|
||||
}
|
||||
|
||||
if (fileId < firstPageId) {
|
||||
|
@ -439,13 +443,15 @@ public class PagingStoreImpl implements PagingStore {
|
|||
}
|
||||
}
|
||||
|
||||
if (currentPageId != 0) {
|
||||
currentPage = createPage(currentPageId);
|
||||
currentPage.open();
|
||||
currentPageId = pageId;
|
||||
|
||||
List<PagedMessage> messages = currentPage.read(storageManager);
|
||||
if (pageId != 0) {
|
||||
Page page = createPage(pageId);
|
||||
page.open();
|
||||
|
||||
LivePageCache pageCache = new LivePageCacheImpl(currentPageId);
|
||||
List<PagedMessage> messages = page.read(storageManager);
|
||||
|
||||
LivePageCache pageCache = new LivePageCacheImpl(pageId);
|
||||
|
||||
for (PagedMessage msg : messages) {
|
||||
pageCache.addLiveMessage(msg);
|
||||
|
@ -455,15 +461,18 @@ public class PagingStoreImpl implements PagingStore {
|
|||
}
|
||||
}
|
||||
|
||||
currentPage.setLiveCache(pageCache);
|
||||
page.setLiveCache(pageCache);
|
||||
|
||||
currentPageSize.set(currentPage.getSize());
|
||||
currentPageSize = page.getSize();
|
||||
|
||||
currentPage = page;
|
||||
|
||||
cursorProvider.addPageCache(pageCache);
|
||||
}
|
||||
|
||||
// We will not mark it for paging if there's only a single empty file
|
||||
if (currentPage != null && !(numberOfPages == 1 && currentPage.getSize() == 0)) {
|
||||
final Page page = currentPage;
|
||||
if (page != null && !(numberOfPages == 1 && page.getSize() == 0)) {
|
||||
startPaging();
|
||||
}
|
||||
}
|
||||
|
@ -539,12 +548,13 @@ public class PagingStoreImpl implements PagingStore {
|
|||
public boolean checkPageFileExists(final int pageNumber) {
|
||||
String fileName = createFileName(pageNumber);
|
||||
|
||||
SequentialFileFactory factory = null;
|
||||
try {
|
||||
checkFileFactory();
|
||||
factory = checkFileFactory();
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
|
||||
SequentialFile file = fileFactory.createSequentialFile(fileName);
|
||||
SequentialFile file = factory.createSequentialFile(fileName);
|
||||
return file.exists();
|
||||
}
|
||||
|
||||
|
@ -552,11 +562,11 @@ public class PagingStoreImpl implements PagingStore {
|
|||
public Page createPage(final int pageNumber) throws Exception {
|
||||
String fileName = createFileName(pageNumber);
|
||||
|
||||
checkFileFactory();
|
||||
SequentialFileFactory factory = checkFileFactory();
|
||||
|
||||
SequentialFile file = fileFactory.createSequentialFile(fileName);
|
||||
SequentialFile file = factory.createSequentialFile(fileName);
|
||||
|
||||
Page page = new Page(storeName, storageManager, fileFactory, file, pageNumber);
|
||||
Page page = new Page(storeName, storageManager, factory, file, pageNumber);
|
||||
|
||||
// To create the file
|
||||
file.open();
|
||||
|
@ -568,10 +578,13 @@ public class PagingStoreImpl implements PagingStore {
|
|||
return page;
|
||||
}
|
||||
|
||||
private void checkFileFactory() throws Exception {
|
||||
if (fileFactory == null) {
|
||||
fileFactory = storeFactory.newFileFactory(getStoreName());
|
||||
private SequentialFileFactory checkFileFactory() throws Exception {
|
||||
SequentialFileFactory factory = fileFactory;
|
||||
if (factory == null) {
|
||||
factory = storeFactory.newFileFactory(getStoreName());
|
||||
fileFactory = factory;
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -689,13 +702,13 @@ public class PagingStoreImpl implements PagingStore {
|
|||
pagingManager.addBlockedStore(this);
|
||||
}
|
||||
|
||||
if (!blocking.get()) {
|
||||
if (!blocking) {
|
||||
if (pagingManager.isDiskFull()) {
|
||||
ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
|
||||
}
|
||||
blocking.set(true);
|
||||
blocking = true;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
|
@ -746,9 +759,9 @@ public class PagingStoreImpl implements PagingStore {
|
|||
if (!globalOversized && (newSize <= maxSize || maxSize < 0)) {
|
||||
if (!onMemoryFreedRunnables.isEmpty()) {
|
||||
executor.execute(this::memoryReleased);
|
||||
if (blocking.get()) {
|
||||
if (blocking) {
|
||||
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize);
|
||||
blocking.set(false);
|
||||
blocking = false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -828,10 +841,11 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
|
||||
|
||||
if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0) {
|
||||
currentPageSize += bytesToWrite;
|
||||
if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) {
|
||||
// Make sure nothing is currently validating or using currentPage
|
||||
openNewPage();
|
||||
currentPageSize.addAndGet(bytesToWrite);
|
||||
currentPageSize += bytesToWrite;
|
||||
}
|
||||
|
||||
if (tx != null) {
|
||||
|
@ -842,17 +856,17 @@ public class PagingStoreImpl implements PagingStore {
|
|||
// especially on the case for non transactional sends and paging
|
||||
// doing this will give us a possibility of recovering the page counters
|
||||
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
|
||||
applyPageCounters(tx, getCurrentPage(), listCtx, persistentSize);
|
||||
final Page page = currentPage;
|
||||
applyPageCounters(tx, page, listCtx, persistentSize);
|
||||
|
||||
currentPage.write(pagedMessage);
|
||||
page.write(pagedMessage);
|
||||
|
||||
if (tx == null && syncNonTransactional && message.isDurable()) {
|
||||
sync();
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Paging message " + pagedMessage + " on pageStore " + this.getStoreName() +
|
||||
" pageNr=" + currentPage.getPageId());
|
||||
logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId());
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -980,8 +994,9 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
if (fileFactory != null) {
|
||||
storeFactory.removeFileFactory(fileFactory);
|
||||
SequentialFileFactory factory = fileFactory;
|
||||
if (factory != null) {
|
||||
storeFactory.removeFileFactory(factory);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1079,32 +1094,35 @@ public class PagingStoreImpl implements PagingStore {
|
|||
try {
|
||||
numberOfPages++;
|
||||
|
||||
int tmpCurrentPageId = currentPageId + 1;
|
||||
final int newPageId = currentPageId + 1;
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("new pageNr=" + tmpCurrentPageId, new Exception("trace"));
|
||||
logger.trace("new pageNr=" + newPageId, new Exception("trace"));
|
||||
}
|
||||
|
||||
if (currentPage != null) {
|
||||
currentPage.close(true);
|
||||
final Page oldPage = currentPage;
|
||||
if (oldPage != null) {
|
||||
oldPage.close(true);
|
||||
}
|
||||
|
||||
currentPage = createPage(tmpCurrentPageId);
|
||||
final Page newPage = createPage(newPageId);
|
||||
|
||||
LivePageCache pageCache = new LivePageCacheImpl(tmpCurrentPageId);
|
||||
currentPage = newPage;
|
||||
|
||||
currentPage.setLiveCache(pageCache);
|
||||
final LivePageCache pageCache = new LivePageCacheImpl(newPageId);
|
||||
|
||||
newPage.setLiveCache(pageCache);
|
||||
|
||||
cursorProvider.addPageCache(pageCache);
|
||||
|
||||
currentPageSize.set(0);
|
||||
currentPageSize = 0;
|
||||
|
||||
currentPage.open();
|
||||
newPage.open();
|
||||
|
||||
currentPageId = tmpCurrentPageId;
|
||||
currentPageId = newPageId;
|
||||
|
||||
if (currentPageId < firstPageId) {
|
||||
firstPageId = currentPageId;
|
||||
if (newPageId < firstPageId) {
|
||||
firstPageId = newPageId;
|
||||
}
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
|
@ -1145,8 +1163,9 @@ public class PagingStoreImpl implements PagingStore {
|
|||
lock.writeLock().lock();
|
||||
try {
|
||||
List<Integer> ids = new ArrayList<>();
|
||||
if (fileFactory != null) {
|
||||
for (String fileName : fileFactory.listFiles("page")) {
|
||||
SequentialFileFactory factory = fileFactory;
|
||||
if (factory != null) {
|
||||
for (String fileName : factory.listFiles("page")) {
|
||||
ids.add(getPageIdFromFileName(fileName));
|
||||
}
|
||||
}
|
||||
|
@ -1158,8 +1177,9 @@ public class PagingStoreImpl implements PagingStore {
|
|||
|
||||
@Override
|
||||
public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception {
|
||||
final SequentialFileFactory factory = fileFactory;
|
||||
for (Integer id : pageIds) {
|
||||
SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id));
|
||||
SequentialFile sFile = factory.createSequentialFile(createFileName(id));
|
||||
if (!sFile.exists()) {
|
||||
continue;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue