This commit is contained in:
Clebert Suconic 2019-05-13 10:24:18 -04:00
commit b25afc9efa
8 changed files with 245 additions and 170 deletions

View File

@ -21,26 +21,26 @@ import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.util.collection.LongObjectHashMap;
import io.netty.util.collection.LongObjectMap;
import org.jboss.logging.Logger;
public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implements Map<K, V> {
public class SoftValueLongObjectHashMap<V extends SoftValueLongObjectHashMap.ValueCache> implements LongObjectMap<V> {
private static final Logger logger = Logger.getLogger(SoftValueHashMap.class);
private static final Logger logger = Logger.getLogger(SoftValueLongObjectHashMap.class);
// The soft references that are already good.
// too bad there's no way to override the queue method on ReferenceQueue, so I wouldn't need this
private final ReferenceQueue<V> refQueue = new ReferenceQueue<>();
private final Map<K, AggregatedSoftReference> mapDelegate = new HashMap<>();
private final LongObjectMap<AggregatedSoftReference<V>> mapDelegate = new LongObjectHashMap<>();
private final AtomicLong usedCounter = new AtomicLong(0);
private long usedCounter = 0;
private int maxElements;
@ -57,7 +57,7 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
// Constructors --------------------------------------------------
public SoftValueHashMap(final int maxElements) {
public SoftValueLongObjectHashMap(final int maxElements) {
this.maxElements = maxElements;
}
@ -96,8 +96,7 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
*/
@Override
public boolean containsKey(final Object key) {
processQueue();
return mapDelegate.containsKey(key);
return containsKey(objectToKey(key));
}
/**
@ -107,7 +106,7 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
@Override
public boolean containsValue(final Object value) {
processQueue();
for (AggregatedSoftReference valueIter : mapDelegate.values()) {
for (AggregatedSoftReference<V> valueIter : mapDelegate.values()) {
V valueElement = valueIter.get();
if (valueElement != null && value.equals(valueElement)) {
return true;
@ -117,16 +116,31 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
return false;
}
private static long objectToKey(Object key) {
return ((Long) key).longValue();
}
/**
* @param key
* @see java.util.Map#get(java.lang.Object)
*/
@Override
public V get(final Object key) {
return get(objectToKey(key));
}
@Override
public V put(Long key, V value) {
return put(objectToKey(key), value);
}
@Override
public V get(long key) {
processQueue();
AggregatedSoftReference value = mapDelegate.get(key);
AggregatedSoftReference<V> value = mapDelegate.get(key);
if (value != null) {
value.used();
usedCounter++;
value.used(usedCounter);
return value.get();
} else {
return null;
@ -139,12 +153,13 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
* @see java.util.Map#put(java.lang.Object, java.lang.Object)
*/
@Override
public V put(final K key, final V value) {
public V put(final long key, final V value) {
processQueue();
AggregatedSoftReference newRef = createReference(key, value);
AggregatedSoftReference oldRef = mapDelegate.put(key, newRef);
AggregatedSoftReference<V> newRef = createReference(key, value);
AggregatedSoftReference<V> oldRef = mapDelegate.put(key, newRef);
checkCacheSize();
newRef.used();
usedCounter++;
newRef.used(usedCounter);
if (oldRef != null) {
return oldRef.get();
} else {
@ -152,11 +167,22 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
}
}
@Override
public V remove(long key) {
processQueue();
AggregatedSoftReference<V> ref = mapDelegate.remove(key);
if (ref != null) {
return ref.get();
} else {
return null;
}
}
private void checkCacheSize() {
if (maxElements > 0 && mapDelegate.size() > maxElements) {
TreeSet<AggregatedSoftReference> usedReferences = new TreeSet<>(new ComparatorAgregated());
for (AggregatedSoftReference ref : mapDelegate.values()) {
for (AggregatedSoftReference<V> ref : mapDelegate.values()) {
V v = ref.get();
if (v != null && !v.isLive()) {
@ -169,7 +195,7 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
Object removed = mapDelegate.remove(ref.key);
if (logger.isTraceEnabled()) {
logger.trace("Removing " + removed + " with id = " + ref.key + " from SoftValueHashMap");
logger.trace("Removing " + removed + " with id = " + ref.key + " from SoftValueLongObjectHashMap");
}
if (mapDelegate.size() <= maxElements) {
@ -210,13 +236,7 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
*/
@Override
public V remove(final Object key) {
processQueue();
AggregatedSoftReference ref = mapDelegate.remove(key);
if (ref != null) {
return ref.get();
} else {
return null;
}
return remove(objectToKey(key));
}
/**
@ -224,10 +244,17 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
* @see java.util.Map#putAll(java.util.Map)
*/
@Override
public void putAll(final Map<? extends K, ? extends V> m) {
public void putAll(final Map<? extends Long, ? extends V> m) {
processQueue();
for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
put(e.getKey(), e.getValue());
if (m instanceof LongObjectMap) {
final LongObjectMap<? extends V> primitiveMap = (LongObjectMap<? extends V>) m;
for (PrimitiveEntry<? extends V> entry : primitiveMap.entries()) {
put(entry.key(), entry.value());
}
} else {
for (Map.Entry<? extends Long, ? extends V> e : m.entrySet()) {
put(e.getKey(), e.getValue());
}
}
}
@ -243,7 +270,7 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
* @see java.util.Map#keySet()
*/
@Override
public Set<K> keySet() {
public Set<Long> keySet() {
processQueue();
return mapDelegate.keySet();
}
@ -256,7 +283,7 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
processQueue();
ArrayList<V> list = new ArrayList<>();
for (AggregatedSoftReference refs : mapDelegate.values()) {
for (AggregatedSoftReference<V> refs : mapDelegate.values()) {
V value = refs.get();
if (value != null) {
list.add(value);
@ -266,20 +293,29 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
return list;
}
/**
* @see java.util.Map#entrySet()
*/
@Override
public Set<java.util.Map.Entry<K, V>> entrySet() {
public Set<Entry<Long, V>> entrySet() {
return null;
}
@Override
public Iterable<PrimitiveEntry<V>> entries() {
processQueue();
HashSet<Map.Entry<K, V>> set = new HashSet<>();
for (Map.Entry<K, AggregatedSoftReference> pair : mapDelegate.entrySet()) {
V value = pair.getValue().get();
final int size = mapDelegate.size();
final List<PrimitiveEntry<V>> entries = new ArrayList<>(size);
for (PrimitiveEntry<AggregatedSoftReference<V>> pair : mapDelegate.entries()) {
V value = pair.value().get();
if (value != null) {
set.add(new EntryElement<>(pair.getKey(), value));
entries.add(new EntryElement<>(pair.key(), value));
}
}
return set;
return entries;
}
@Override
public boolean containsKey(long key) {
processQueue();
return mapDelegate.containsKey(key);
}
/**
@ -315,15 +351,15 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
}
}
private AggregatedSoftReference createReference(final K key, final V value) {
return new AggregatedSoftReference(key, value);
private AggregatedSoftReference createReference(final long key, final V value) {
return new AggregatedSoftReference(key, value, refQueue);
}
// Inner classes -------------------------------------------------
class AggregatedSoftReference extends SoftReference<V> {
static final class AggregatedSoftReference<V> extends SoftReference<V> {
final K key;
final long key;
long used = 0;
@ -331,11 +367,11 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
return used;
}
public void used() {
used = usedCounter.incrementAndGet();
public void used(long value) {
this.used = value;
}
AggregatedSoftReference(final K key, final V referent) {
AggregatedSoftReference(final long key, final V referent, ReferenceQueue<V> refQueue) {
super(referent, refQueue);
this.key = key;
}
@ -346,40 +382,31 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
}
}
static final class EntryElement<K, V> implements Map.Entry<K, V> {
static final class EntryElement<V> implements LongObjectMap.PrimitiveEntry<V> {
final K key;
final long key;
volatile V value;
final V value;
EntryElement(final K key, final V value) {
EntryElement(final long key, final V value) {
this.key = key;
this.value = value;
}
/* (non-Javadoc)
* @see java.util.Map.Entry#getKey()
*/
@Override
public K getKey() {
public long key() {
return key;
}
/* (non-Javadoc)
* @see java.util.Map.Entry#getValue()
*/
@Override
public V getValue() {
public V value() {
return value;
}
/* (non-Javadoc)
* @see java.util.Map.Entry#setValue(java.lang.Object)
*/
@Override
public V setValue(final V value) {
this.value = value;
return value;
@Deprecated
public void setValue(V value) {
throw new UnsupportedOperationException();
}
}

View File

@ -17,9 +17,9 @@
package org.apache.activemq.artemis.core.paging.cursor;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.utils.SoftValueHashMap;
import org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap;
public interface PageCache extends SoftValueHashMap.ValueCache {
public interface PageCache extends SoftValueLongObjectHashMap.ValueCache {
long getPageId();

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.utils.collections.ConcurrentAppendOnlyChunkedList;
import org.jboss.logging.Logger;
@ -34,18 +33,18 @@ public final class LivePageCacheImpl implements LivePageCache {
private final ConcurrentAppendOnlyChunkedList<PagedMessage> messages;
private final Page page;
private final long pageId;
private volatile boolean isLive = true;
public LivePageCacheImpl(final Page page) {
this.page = page;
public LivePageCacheImpl(final long pageId) {
this.pageId = pageId;
this.messages = new ConcurrentAppendOnlyChunkedList<>(CHUNK_SIZE);
}
@Override
public long getPageId() {
return page.getPageId();
return pageId;
}
@Override
@ -92,6 +91,6 @@ public final class LivePageCacheImpl implements LivePageCache {
@Override
public String toString() {
return "LivePacheCacheImpl::page=" + page.getPageId() + " number of messages=" + getNumberOfMessages() + " isLive = " + isLive;
return "LivePacheCacheImpl::page=" + pageId + " number of messages=" + getNumberOfMessages() + " isLive = " + isLive;
}
}

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.cursor.PageCache;
import org.apache.activemq.artemis.core.paging.impl.Page;
/**
* The caching associated to a single page.
@ -31,14 +30,14 @@ class PageCacheImpl implements PageCache {
private PagedMessage[] messages;
private final Page page;
private final long pageId;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
PageCacheImpl(final Page page) {
this.page = page;
PageCacheImpl(final long pageId) {
this.pageId = pageId;
}
// Public --------------------------------------------------------
@ -54,7 +53,7 @@ class PageCacheImpl implements PageCache {
@Override
public long getPageId() {
return page.getPageId();
return pageId;
}
@Override
@ -78,7 +77,7 @@ class PageCacheImpl implements PageCache {
@Override
public String toString() {
return "PageCacheImpl::page=" + page.getPageId() + " numberOfMessages = " + messages.length;
return "PageCacheImpl::page=" + pageId + " numberOfMessages = " + messages.length;
}
@Override

View File

@ -19,8 +19,6 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -39,8 +37,9 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.SoftValueHashMap;
import org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.jboss.logging.Logger;
/**
@ -70,9 +69,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
// This is the same executor used at the PageStoreImpl. One Executor per pageStore
private final ArtemisExecutor executor;
private final SoftValueHashMap<Long, PageCache> softCache;
private final SoftValueLongObjectHashMap<PageCache> softCache;
private final ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<>();
private final ConcurrentLongHashMap<PageSubscription> activeCursors = new ConcurrentLongHashMap<>();
// Static --------------------------------------------------------
@ -85,7 +84,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
this.pagingStore = pagingStore;
this.storageManager = storageManager;
this.executor = executor;
this.softCache = new SoftValueHashMap<>(maxCacheSize);
this.softCache = new SoftValueLongObjectHashMap<>(maxCacheSize);
}
// Public --------------------------------------------------------
@ -567,8 +566,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
// Protected -----------------------------------------------------
/* Protected as we may let test cases to instrument the test */
protected PageCacheImpl createPageCache(final long pageId) throws Exception {
return new PageCacheImpl(pagingStore.createPage((int) pageId));
protected PageCacheImpl createPageCache(final long pageId) {
return new PageCacheImpl(pageId);
}
// Private -------------------------------------------------------

View File

@ -25,6 +25,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@ -213,7 +214,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
pageStore.getCurrentPage().getPageId() == position.getPageNr()) {
pageStore.forceAnotherPage();
}
PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null);
PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr());
info.setCompleteInfo(position);
synchronized (consumedPages) {
consumedPages.put(Long.valueOf(position.getPageNr()), info);
@ -809,7 +810,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
if (cache == null) {
return null;
}
pageInfo = new PageCursorInfo(pageNr, cache.getNumberOfMessages(), cache);
assert pageNr == cache.getPageId();
pageInfo = new PageCursorInfo(cache);
consumedPages.put(pageNr, pageInfo);
}
return pageInfo;
@ -822,7 +824,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
synchronized (consumedPages) {
PageCursorInfo pageInfo = consumedPages.get(cache.getPageId());
if (pageInfo == null) {
consumedPages.put(cache.getPageId(), new PageCursorInfo(cache.getPageId(), cache.getNumberOfMessages(), cache));
consumedPages.put(cache.getPageId(), new PageCursorInfo(cache));
}
}
}
@ -950,7 +952,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
public final class PageCursorInfo {
// Number of messages existent on this page
private final int numberOfMessages;
private int numberOfMessages;
private final long pageId;
@ -1008,15 +1010,30 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
}
private PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache) {
logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d, cache=%s", pageId, numberOfMessages, cache);
private PageCursorInfo(final long pageId, final int numberOfMessages) {
if (numberOfMessages < 0) {
throw new IllegalStateException("numberOfMessages = " + numberOfMessages + " instead of being >=0");
}
this.pageId = pageId;
wasLive = false;
this.numberOfMessages = numberOfMessages;
if (cache != null) {
wasLive = cache.isLive();
this.cache = new WeakReference<>(cache);
logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d, not live", pageId, numberOfMessages);
}
private PageCursorInfo(final PageCache cache) {
Objects.requireNonNull(cache);
this.pageId = cache.getPageId();
wasLive = cache.isLive();
this.cache = new WeakReference<>(cache);
if (!wasLive) {
final int numberOfMessages = cache.getNumberOfMessages();
assert numberOfMessages >= 0;
this.numberOfMessages = numberOfMessages;
logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d, cache=%s, not live", pageId, this.numberOfMessages, cache);
} else {
wasLive = false;
//given that is live, the exact value must be get directly from cache
this.numberOfMessages = -1;
logger.tracef("Created PageCursorInfo for pageNr=%d, cache=%s, live", pageId, cache);
}
}
@ -1117,18 +1134,32 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
}
private int getNumberOfMessages() {
if (wasLive) {
// if the page was live at any point, we need to
// get the number of messages from the page-cache
PageCache localcache = this.cache.get();
if (localcache == null) {
localcache = cursorProvider.getPageCache(pageId);
this.cache = new WeakReference<>(localcache);
}
private int getNumberOfMessagesFromPageCache() {
// if the page was live at any point, we need to
// get the number of messages from the page-cache
PageCache localCache = this.cache.get();
if (localCache == null) {
localCache = cursorProvider.getPageCache(pageId);
this.cache = new WeakReference<>(localCache);
}
int numberOfMessage = localCache.getNumberOfMessages();
if (!localCache.isLive()) {
//to avoid further "live" queries
this.numberOfMessages = numberOfMessage;
}
return numberOfMessage;
}
return localcache.getNumberOfMessages();
private int getNumberOfMessages() {
final int numberOfMessages = this.numberOfMessages;
if (wasLive) {
if (numberOfMessages < 0) {
return getNumberOfMessagesFromPageCache();
} else {
return numberOfMessages;
}
} else {
assert numberOfMessages >= 0;
return numberOfMessages;
}
}

View File

@ -28,8 +28,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -78,7 +76,8 @@ public class PagingStoreImpl implements PagingStore {
private final DecimalFormat format = new DecimalFormat("000000000");
private final AtomicInteger currentPageSize = new AtomicInteger(0);
//it's being guarded by lock.writeLock().lock() and never read concurrently
private int currentPageSize = 0;
private final SimpleString storeName;
@ -125,7 +124,7 @@ public class PagingStoreImpl implements PagingStore {
private final boolean syncNonTransactional;
private volatile AtomicBoolean blocking = new AtomicBoolean(false);
private volatile boolean blocking = false;
private long rejectThreshold;
@ -280,7 +279,7 @@ public class PagingStoreImpl implements PagingStore {
@Override
public File getFolder() {
SequentialFileFactory factoryUsed = this.fileFactory;
final SequentialFileFactory factoryUsed = this.fileFactory;
if (factoryUsed != null) {
return factoryUsed.getDirectory();
} else {
@ -333,8 +332,9 @@ public class PagingStoreImpl implements PagingStore {
lock.readLock().lock();
try {
if (currentPage != null) {
currentPage.sync();
final Page page = currentPage;
if (page != null) {
page.sync();
}
} finally {
lock.readLock().unlock();
@ -377,8 +377,9 @@ public class PagingStoreImpl implements PagingStore {
running = false;
if (currentPage != null) {
currentPage.close(false);
final Page page = currentPage;
if (page != null) {
page.close(false);
currentPage = null;
}
}
@ -415,11 +416,14 @@ public class PagingStoreImpl implements PagingStore {
firstPageId = Integer.MAX_VALUE;
// There are no files yet on this Storage. We will just return it empty
final SequentialFileFactory fileFactory = this.fileFactory;
if (fileFactory != null) {
currentPageId = 0;
if (currentPage != null) {
currentPage.close(false);
int pageId = 0;
currentPageId = pageId;
final Page oldPage = currentPage;
if (oldPage != null) {
oldPage.close(false);
}
currentPage = null;
@ -430,8 +434,8 @@ public class PagingStoreImpl implements PagingStore {
for (String fileName : files) {
final int fileId = PagingStoreImpl.getPageIdFromFileName(fileName);
if (fileId > currentPageId) {
currentPageId = fileId;
if (fileId > pageId) {
pageId = fileId;
}
if (fileId < firstPageId) {
@ -439,13 +443,15 @@ public class PagingStoreImpl implements PagingStore {
}
}
if (currentPageId != 0) {
currentPage = createPage(currentPageId);
currentPage.open();
currentPageId = pageId;
List<PagedMessage> messages = currentPage.read(storageManager);
if (pageId != 0) {
Page page = createPage(pageId);
page.open();
LivePageCache pageCache = new LivePageCacheImpl(currentPage);
List<PagedMessage> messages = page.read(storageManager);
LivePageCache pageCache = new LivePageCacheImpl(pageId);
for (PagedMessage msg : messages) {
pageCache.addLiveMessage(msg);
@ -455,15 +461,18 @@ public class PagingStoreImpl implements PagingStore {
}
}
currentPage.setLiveCache(pageCache);
page.setLiveCache(pageCache);
currentPageSize.set(currentPage.getSize());
currentPageSize = page.getSize();
currentPage = page;
cursorProvider.addPageCache(pageCache);
}
// We will not mark it for paging if there's only a single empty file
if (currentPage != null && !(numberOfPages == 1 && currentPage.getSize() == 0)) {
final Page page = currentPage;
if (page != null && !(numberOfPages == 1 && page.getSize() == 0)) {
startPaging();
}
}
@ -539,12 +548,13 @@ public class PagingStoreImpl implements PagingStore {
public boolean checkPageFileExists(final int pageNumber) {
String fileName = createFileName(pageNumber);
SequentialFileFactory factory = null;
try {
checkFileFactory();
factory = checkFileFactory();
} catch (Exception ignored) {
}
SequentialFile file = fileFactory.createSequentialFile(fileName);
SequentialFile file = factory.createSequentialFile(fileName);
return file.exists();
}
@ -552,11 +562,11 @@ public class PagingStoreImpl implements PagingStore {
public Page createPage(final int pageNumber) throws Exception {
String fileName = createFileName(pageNumber);
checkFileFactory();
SequentialFileFactory factory = checkFileFactory();
SequentialFile file = fileFactory.createSequentialFile(fileName);
SequentialFile file = factory.createSequentialFile(fileName);
Page page = new Page(storeName, storageManager, fileFactory, file, pageNumber);
Page page = new Page(storeName, storageManager, factory, file, pageNumber);
// To create the file
file.open();
@ -568,10 +578,13 @@ public class PagingStoreImpl implements PagingStore {
return page;
}
private void checkFileFactory() throws Exception {
if (fileFactory == null) {
fileFactory = storeFactory.newFileFactory(getStoreName());
private SequentialFileFactory checkFileFactory() throws Exception {
SequentialFileFactory factory = fileFactory;
if (factory == null) {
factory = storeFactory.newFileFactory(getStoreName());
fileFactory = factory;
}
return factory;
}
@Override
@ -689,13 +702,13 @@ public class PagingStoreImpl implements PagingStore {
pagingManager.addBlockedStore(this);
}
if (!blocking.get()) {
if (!blocking) {
if (pagingManager.isDiskFull()) {
ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
} else {
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
}
blocking.set(true);
blocking = true;
}
}
return true;
@ -746,9 +759,9 @@ public class PagingStoreImpl implements PagingStore {
if (!globalOversized && (newSize <= maxSize || maxSize < 0)) {
if (!onMemoryFreedRunnables.isEmpty()) {
executor.execute(this::memoryReleased);
if (blocking.get()) {
if (blocking) {
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize);
blocking.set(false);
blocking = false;
return true;
}
}
@ -828,10 +841,11 @@ public class PagingStoreImpl implements PagingStore {
int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0) {
currentPageSize += bytesToWrite;
if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) {
// Make sure nothing is currently validating or using currentPage
openNewPage();
currentPageSize.addAndGet(bytesToWrite);
currentPageSize += bytesToWrite;
}
if (tx != null) {
@ -842,17 +856,17 @@ public class PagingStoreImpl implements PagingStore {
// especially on the case for non transactional sends and paging
// doing this will give us a possibility of recovering the page counters
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
applyPageCounters(tx, getCurrentPage(), listCtx, persistentSize);
final Page page = currentPage;
applyPageCounters(tx, page, listCtx, persistentSize);
currentPage.write(pagedMessage);
page.write(pagedMessage);
if (tx == null && syncNonTransactional && message.isDurable()) {
sync();
}
if (logger.isTraceEnabled()) {
logger.trace("Paging message " + pagedMessage + " on pageStore " + this.getStoreName() +
" pageNr=" + currentPage.getPageId());
logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId());
}
return true;
@ -980,8 +994,9 @@ public class PagingStoreImpl implements PagingStore {
@Override
public void destroy() throws Exception {
if (fileFactory != null) {
storeFactory.removeFileFactory(fileFactory);
SequentialFileFactory factory = fileFactory;
if (factory != null) {
storeFactory.removeFileFactory(factory);
}
}
@ -1079,32 +1094,35 @@ public class PagingStoreImpl implements PagingStore {
try {
numberOfPages++;
int tmpCurrentPageId = currentPageId + 1;
final int newPageId = currentPageId + 1;
if (logger.isTraceEnabled()) {
logger.trace("new pageNr=" + tmpCurrentPageId, new Exception("trace"));
logger.trace("new pageNr=" + newPageId, new Exception("trace"));
}
if (currentPage != null) {
currentPage.close(true);
final Page oldPage = currentPage;
if (oldPage != null) {
oldPage.close(true);
}
currentPage = createPage(tmpCurrentPageId);
final Page newPage = createPage(newPageId);
LivePageCache pageCache = new LivePageCacheImpl(currentPage);
currentPage = newPage;
currentPage.setLiveCache(pageCache);
final LivePageCache pageCache = new LivePageCacheImpl(newPageId);
newPage.setLiveCache(pageCache);
cursorProvider.addPageCache(pageCache);
currentPageSize.set(0);
currentPageSize = 0;
currentPage.open();
newPage.open();
currentPageId = tmpCurrentPageId;
currentPageId = newPageId;
if (currentPageId < firstPageId) {
firstPageId = currentPageId;
if (newPageId < firstPageId) {
firstPageId = newPageId;
}
} finally {
lock.writeLock().unlock();
@ -1145,8 +1163,9 @@ public class PagingStoreImpl implements PagingStore {
lock.writeLock().lock();
try {
List<Integer> ids = new ArrayList<>();
if (fileFactory != null) {
for (String fileName : fileFactory.listFiles("page")) {
SequentialFileFactory factory = fileFactory;
if (factory != null) {
for (String fileName : factory.listFiles("page")) {
ids.add(getPageIdFromFileName(fileName));
}
}
@ -1158,8 +1177,9 @@ public class PagingStoreImpl implements PagingStore {
@Override
public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception {
final SequentialFileFactory factory = fileFactory;
for (Integer id : pageIds) {
SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id));
SequentialFile sFile = factory.createSequentialFile(createFileName(id));
if (!sFile.exists()) {
continue;
}

View File

@ -17,7 +17,7 @@
package org.apache.activemq.artemis.tests.unit.util;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.SoftValueHashMap;
import org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap;
import org.junit.Test;
public class SoftValueMapTest extends ActiveMQTestBase {
@ -40,7 +40,7 @@ public class SoftValueMapTest extends ActiveMQTestBase {
// each buffer will be 1/10th of the maxMemory
int bufferSize = (int) (maxMemory / 100);
SoftValueHashMap<Long, Value> softCache = new SoftValueHashMap<>(100);
SoftValueLongObjectHashMap<Value> softCache = new SoftValueLongObjectHashMap<>(100);
final int MAX_ELEMENTS = 1000;
@ -59,7 +59,7 @@ public class SoftValueMapTest extends ActiveMQTestBase {
public void testEvictionsLeastUsed() {
forceGC();
SoftValueHashMap<Long, Value> softCache = new SoftValueHashMap<>(200);
SoftValueLongObjectHashMap<Value> softCache = new SoftValueLongObjectHashMap<>(200);
for (long i = 0; i < 100; i++) {
Value v = new Value(new byte[1]);
@ -99,7 +99,7 @@ public class SoftValueMapTest extends ActiveMQTestBase {
Value two = new Value(new byte[100]);
Value three = new Value(new byte[100]);
SoftValueHashMap<Integer, Value> softCache = new SoftValueHashMap<>(2);
SoftValueLongObjectHashMap<Value> softCache = new SoftValueLongObjectHashMap<>(2);
softCache.put(3, three);
softCache.put(2, two);
softCache.put(1, one);
@ -110,7 +110,7 @@ public class SoftValueMapTest extends ActiveMQTestBase {
}
class Value implements SoftValueHashMap.ValueCache {
class Value implements SoftValueLongObjectHashMap.ValueCache {
byte[] payload;
@ -121,7 +121,7 @@ public class SoftValueMapTest extends ActiveMQTestBase {
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.utils.SoftValueHashMap.ValueCache#isLive()
* @see org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap.ValueCache#isLive()
*/
@Override
public boolean isLive() {