ARTEMIS-4056 Paging Management optimizations

- optimize startup time on paging (check-depage on startup)
- otpimize getNextPage() on complete pages
- optimize getFirstMessage() and paging. (avoid iterator usage)
This commit is contained in:
Clebert Suconic 2022-10-16 17:15:42 -04:00 committed by clebertsuconic
parent fbc6dd066b
commit b388a24b26
12 changed files with 157 additions and 29 deletions

View File

@ -31,6 +31,10 @@ public class EmptyList<E> implements LinkedList<E> {
private EmptyList() {
}
@Override
public E peek() {
return null;
}
@Override
public void addHead(E e) {

View File

@ -28,6 +28,8 @@ public interface LinkedList<E> {
E poll();
E peek();
LinkedListIterator<E> iterator();
void clear();

View File

@ -103,6 +103,16 @@ public class LinkedListImpl<E> implements LinkedList<E> {
size++;
}
@Override
public E peek() {
Node<E> current = head.next;
if (current == null) {
return null;
} else {
return current.val();
}
}
@Override
public E get(int position) {
Node<E> current = head.next;

View File

@ -31,6 +31,9 @@ public interface PriorityLinkedList<E> {
E poll();
/** just look at the first element on the list */
E peek();
void clear();
/**

View File

@ -126,6 +126,17 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
return null;
}
@Override
public E peek() {
for (LinkedListImpl<E> level : levels) {
E value = level.peek();
if (value != null) {
return value;
}
}
return null;
}
@Override
public E poll() {

View File

@ -874,7 +874,13 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
protected Map<String, Object>[] getFirstMessage() throws Exception {
/**
* this method returns a Map representing the first message.
* or null if there's no first message.
* @return
* @throws Exception
*/
protected Map<String, Object> getFirstMessage() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getFirstMessage(queue);
}
@ -883,16 +889,12 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
clearIO();
try {
List<Map<String, Object>> messages = new ArrayList<>();
queue.flushExecutor();
final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
// returns just the first, as it's the first only
if (iterator.hasNext()) {
MessageReference ref = iterator.next();
Message message = ref.getMessage();
messages.add(message.toMap(attributeSizeLimit));
}
return messages.toArray(new Map[1]);
MessageReference firstMessage = queue.peekFirstMessage();
if (firstMessage != null) {
return firstMessage.getMessage().toMap(attributeSizeLimit);
} else {
return null;
}
} finally {
blockOnIO();
@ -905,7 +907,8 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getFirstMessageAsJSON(queue);
}
return toJSON(getFirstMessage());
Map<String, Object> message = getFirstMessage();
return toJSON(message == null ? new Map[0] : new Map[]{message});
}
@Override
@ -914,15 +917,16 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
AuditLogger.getFirstMessageTimestamp(queue);
}
Map<String, Object>[] _message = getFirstMessage();
if (_message == null || _message.length == 0 || _message[0] == null) {
Map<String, Object> message = getFirstMessage();
if (message == null) {
return null;
} else {
if (!message.containsKey("timestamp")) {
return null;
} else {
return (Long) message.get("timestamp");
}
}
Map<String, Object> message = _message[0];
if (!message.containsKey("timestamp")) {
return null;
}
return (Long) message.get("timestamp");
}
@Override
@ -1562,7 +1566,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
ArrayList<CompositeData> c = new ArrayList<>();
Filter thefilter = FilterImpl.createFilter(filter);
queue.flushExecutor();
final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
@ -1618,7 +1621,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
int currentPageSize = 0;
ArrayList<CompositeData> c = new ArrayList<>();
Filter thefilter = FilterImpl.createFilter(filter);
queue.flushExecutor();
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
try {
while (iterator.hasNext() && currentPageSize++ < limit) {

View File

@ -832,13 +832,16 @@ public final class PageSubscriptionImpl implements PageSubscription {
@Override
public void processReload() throws Exception {
if (recoveredACK != null) {
logger.trace("********** processing reload!!!!!!!");
if (logger.isDebugEnabled()) {
logger.debug("processing reload queue name={} with id={}", queue != null ? this.queue.getName() : "N/A", cursorId);
}
Collections.sort(recoveredACK);
long txDeleteCursorOnReload = -1;
for (PagePosition pos : recoveredACK) {
logger.trace("reloading pos {}", pos);
lastAckedPosition = pos;
PageCursorInfo pageInfo = getPageInfo(pos);
pageInfo.loadACK(pos);
@ -871,7 +874,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
@Override
public void onDeletePage(Page deletedPage) throws Exception {
logger.trace("removing page {}", deletedPage);
logger.debug("removing page {}", deletedPage);
PageCursorInfo info;
synchronized (consumedPages) {
info = consumedPages.remove(Long.valueOf(deletedPage.getPageId()));
@ -909,6 +912,12 @@ public final class PageSubscriptionImpl implements PageSubscription {
return getPageInfo(pos.getPageNr());
}
public PageCursorInfo locatePageInfo(final long pageNr) {
synchronized (consumedPages) {
return consumedPages.get(pageNr);
}
}
public PageCursorInfo getPageInfo(final long pageNr) {
synchronized (consumedPages) {
PageCursorInfo pageInfo = consumedPages.get(pageNr);
@ -1278,11 +1287,20 @@ public final class PageSubscriptionImpl implements PageSubscription {
private LinkedListIterator<PagedMessage> currentPageIterator;
private void initPage(long page) {
if (logger.isDebugEnabled()) {
logger.debug("initPage {}", page);
}
try {
if (currentPage != null) {
if (logger.isTraceEnabled()) {
logger.trace("usage down {} on subscription {}", currentPage.getPageId(), cursorId);
}
currentPage.usageDown();
}
if (currentPageIterator != null) {
if (logger.isTraceEnabled()) {
logger.trace("closing pageIterator on {}", cursorId);
}
currentPageIterator.close();
}
currentPage = pageStore.usePage(page);
@ -1460,23 +1478,47 @@ public final class PageSubscriptionImpl implements PageSubscription {
private PagedReference internalGetNext() {
for (;;) {
assert currentPageIterator != null : "currentPageIterator is null";
PagedMessage message = currentPageIterator.hasNext() ? currentPageIterator.next() : null;
logger.trace("CursorIterator::internalGetNext:: new reference {}", message);
if (message != null) {
return cursorProvider.newReference(message, PageSubscriptionImpl.this);
}
if (currentPage.getPageId() < pageStore.getCurrentWritingPage()) {
if (logger.isTraceEnabled()) {
logger.trace("Current page {}", currentPage != null ? currentPage.getPageId() : null);
}
long nextPage = getNextPage();
if (logger.isTraceEnabled()) {
logger.trace("next page {}", nextPage);
}
if (nextPage >= 0) {
if (logger.isTraceEnabled()) {
logger.trace("CursorIterator::internalGetNext:: moving to currentPage {}", currentPage.getPageId() + 1);
logger.trace("CursorIterator::internalGetNext:: moving to currentPage {}", nextPage);
}
initPage(currentPage.getPageId() + 1);
initPage(nextPage);
} else {
return null;
}
}
}
private long getNextPage() {
long page = currentPage.getPageId() + 1;
while (page <= pageStore.getCurrentWritingPage()) {
PageCursorInfo info = locatePageInfo(page);
if (info == null || info.getCompleteInfo() == null) {
return page;
}
if (logger.isDebugEnabled()) {
logger.debug("Subscription {} named {} moving faster from page {} to next", cursorId, queue.getName(), page);
}
page++;
}
return -1;
}
@Override
public synchronized NextResult tryNext() {

View File

@ -150,9 +150,6 @@ public final class Page {
}
public synchronized LinkedList<PagedMessage> read(StorageManager storage, boolean onlyLargeMessages) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("reading page {} on address = {} onlyLargeMessages = {}", pageId, storeName, onlyLargeMessages);
}
if (!file.isOpen()) {
if (!file.exists()) {
@ -161,6 +158,12 @@ public final class Page {
throw ActiveMQMessageBundle.BUNDLE.invalidPageIO();
}
if (logger.isTraceEnabled()) {
logger.trace("reading page {} on address = {} onlyLargeMessages = {}", pageId, storeName, onlyLargeMessages, new Exception("trace"));
} else if (logger.isDebugEnabled()) {
logger.debug("reading page {} on address = {} onlyLargeMessages = {}", pageId, storeName, onlyLargeMessages);
}
size = file.size();
final LinkedList<PagedMessage> messages = new LinkedListImpl<>();

View File

@ -427,6 +427,10 @@ public interface Queue extends Bindable,CriticalComponent {
}
}
default MessageReference peekFirstMessage() {
return null;
}
LinkedListIterator<MessageReference> browserIterator();
SimpleString getExpiryAddress();

View File

@ -1308,7 +1308,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private void deliverAsync(boolean noWait) {
if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) {
scheduledRunners.incrementAndGet();
checkDepage();
try {
getExecutor().execute(deliverRunner);
} catch (RejectedExecutionException ignored) {
@ -1638,6 +1637,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return new QueueBrowserIterator();
}
@Override
public MessageReference peekFirstMessage() {
synchronized (this) {
if (messageReferences != null) {
return messageReferences.peek();
}
}
return null;
}
@Override
public synchronized MessageReference removeReferenceWithID(final long id1) throws Exception {
try (LinkedListIterator<MessageReference> iterator = iterator()) {
@ -3164,6 +3174,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return;
}
if (pageIterator != null && pageSubscription.isPaging()) {
if (logger.isDebugEnabled()) {
logger.debug("CheckDepage on queue name {}, id={}", name, id);
}
// we will issue a delivery runnable to check for released space from acks and resume depage
pageDelivered = true;

View File

@ -878,6 +878,21 @@ public final class PriorityLinkedListTest extends Assert {
}
@Test
public void testPeek() {
assertNull(list.peek());
list.addTail(c, 5);
assertEquals(c, list.peek());
list.addTail(k, 0);
assertEquals(k, list.peek());
list.addHead(a, 0);
assertEquals(a, list.peek());
}
@Test
public void testRemoveWithID() {

View File

@ -459,6 +459,25 @@ public class LinkedListTest extends ActiveMQTestBase {
}
}
@Test
public void testPeek() {
assertEquals(0, list.size());
assertNull(list.peek());
list.addTail(10);
assertEquals(10, (int)list.peek());
assertEquals(10, (int)list.poll());
assertNull(list.peek());
list.addTail(12);
assertEquals(12, (int)list.peek());
list.addHead(5);
assertEquals(5, (int)list.peek());
list.poll();
assertEquals(12, (int)list.peek());
list.poll();
assertNull(list.peek());
}
@Test
public void testAddHead() {
int num = 10;