ARTEMIS-2321 PageCursorInfo caches number of msgs to save Page::read

This commit is contained in:
Francesco Nigro 2019-04-30 18:11:05 +02:00 committed by Clebert Suconic
parent b173bb5552
commit 30c82f43b0
1 changed files with 51 additions and 20 deletions

View File

@ -25,6 +25,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@ -213,7 +214,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
pageStore.getCurrentPage().getPageId() == position.getPageNr()) {
pageStore.forceAnotherPage();
}
PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null);
PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr());
info.setCompleteInfo(position);
synchronized (consumedPages) {
consumedPages.put(Long.valueOf(position.getPageNr()), info);
@ -809,7 +810,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
if (cache == null) {
return null;
}
pageInfo = new PageCursorInfo(pageNr, cache.getNumberOfMessages(), cache);
assert pageNr == cache.getPageId();
pageInfo = new PageCursorInfo(cache);
consumedPages.put(pageNr, pageInfo);
}
return pageInfo;
@ -822,7 +824,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
synchronized (consumedPages) {
PageCursorInfo pageInfo = consumedPages.get(cache.getPageId());
if (pageInfo == null) {
consumedPages.put(cache.getPageId(), new PageCursorInfo(cache.getPageId(), cache.getNumberOfMessages(), cache));
consumedPages.put(cache.getPageId(), new PageCursorInfo(cache));
}
}
}
@ -950,7 +952,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
public final class PageCursorInfo {
// Number of messages existent on this page
private final int numberOfMessages;
private int numberOfMessages;
private final long pageId;
@ -1008,15 +1010,30 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
}
private PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache) {
logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d, cache=%s", pageId, numberOfMessages, cache);
private PageCursorInfo(final long pageId, final int numberOfMessages) {
if (numberOfMessages < 0) {
throw new IllegalStateException("numberOfMessages = " + numberOfMessages + " instead of being >=0");
}
this.pageId = pageId;
wasLive = false;
this.numberOfMessages = numberOfMessages;
if (cache != null) {
wasLive = cache.isLive();
this.cache = new WeakReference<>(cache);
logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d, not live", pageId, numberOfMessages);
}
private PageCursorInfo(final PageCache cache) {
Objects.requireNonNull(cache);
this.pageId = cache.getPageId();
wasLive = cache.isLive();
this.cache = new WeakReference<>(cache);
if (!wasLive) {
final int numberOfMessages = cache.getNumberOfMessages();
assert numberOfMessages >= 0;
this.numberOfMessages = numberOfMessages;
logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d, cache=%s, not live", pageId, this.numberOfMessages, cache);
} else {
wasLive = false;
//given that is live, the exact value must be get directly from cache
this.numberOfMessages = -1;
logger.tracef("Created PageCursorInfo for pageNr=%d, cache=%s, live", pageId, cache);
}
}
@ -1117,18 +1134,32 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
}
private int getNumberOfMessages() {
if (wasLive) {
// if the page was live at any point, we need to
// get the number of messages from the page-cache
PageCache localcache = this.cache.get();
if (localcache == null) {
localcache = cursorProvider.getPageCache(pageId);
this.cache = new WeakReference<>(localcache);
}
private int getNumberOfMessagesFromPageCache() {
// if the page was live at any point, we need to
// get the number of messages from the page-cache
PageCache localCache = this.cache.get();
if (localCache == null) {
localCache = cursorProvider.getPageCache(pageId);
this.cache = new WeakReference<>(localCache);
}
int numberOfMessage = localCache.getNumberOfMessages();
if (!localCache.isLive()) {
//to avoid further "live" queries
this.numberOfMessages = numberOfMessage;
}
return numberOfMessage;
}
return localcache.getNumberOfMessages();
private int getNumberOfMessages() {
final int numberOfMessages = this.numberOfMessages;
if (wasLive) {
if (numberOfMessages < 0) {
return getNumberOfMessagesFromPageCache();
} else {
return numberOfMessages;
}
} else {
assert numberOfMessages >= 0;
return numberOfMessages;
}
}