diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java index 741365a390..7215a71921 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java @@ -31,6 +31,10 @@ public class EmptyList implements LinkedList { private EmptyList() { } + @Override + public E peek() { + return null; + } @Override public void addHead(E e) { diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedList.java index 18834564d3..e70f5caa80 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedList.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedList.java @@ -28,6 +28,8 @@ public interface LinkedList { E poll(); + E peek(); + LinkedListIterator iterator(); void clear(); diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java index 4f3b8e9dc9..76ee69499e 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java @@ -103,6 +103,16 @@ public class LinkedListImpl implements LinkedList { size++; } + @Override + public E peek() { + Node current = head.next; + if (current == null) { + return null; + } else { + return current.val(); + } + } + @Override public E get(int position) { Node current = head.next; diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java index 4d132e6554..7d2a3748e5 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java @@ -31,6 +31,9 @@ public interface PriorityLinkedList { E poll(); + /** just look at the first element on the list */ + E peek(); + void clear(); /** diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java index 4d01441c9b..73c2aacf8a 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java @@ -126,6 +126,17 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { return null; } + @Override + public E peek() { + for (LinkedListImpl level : levels) { + E value = level.peek(); + if (value != null) { + return value; + } + } + + return null; + } @Override public E poll() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 1ffe4bcfd2..5e31da5aeb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -874,7 +874,13 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } } - protected Map[] getFirstMessage() throws Exception { + /** + * this method returns a Map representing the first message. + * or null if there's no first message. + * @return + * @throws Exception + */ + protected Map getFirstMessage() throws Exception { if (AuditLogger.isBaseLoggingEnabled()) { AuditLogger.getFirstMessage(queue); } @@ -883,16 +889,12 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { clearIO(); try { List> messages = new ArrayList<>(); - queue.flushExecutor(); final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit(); - try (LinkedListIterator iterator = queue.browserIterator()) { - // returns just the first, as it's the first only - if (iterator.hasNext()) { - MessageReference ref = iterator.next(); - Message message = ref.getMessage(); - messages.add(message.toMap(attributeSizeLimit)); - } - return messages.toArray(new Map[1]); + MessageReference firstMessage = queue.peekFirstMessage(); + if (firstMessage != null) { + return firstMessage.getMessage().toMap(attributeSizeLimit); + } else { + return null; } } finally { blockOnIO(); @@ -905,7 +907,8 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { if (AuditLogger.isBaseLoggingEnabled()) { AuditLogger.getFirstMessageAsJSON(queue); } - return toJSON(getFirstMessage()); + Map message = getFirstMessage(); + return toJSON(message == null ? new Map[0] : new Map[]{message}); } @Override @@ -914,15 +917,16 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { AuditLogger.getFirstMessageTimestamp(queue); } - Map[] _message = getFirstMessage(); - if (_message == null || _message.length == 0 || _message[0] == null) { + Map message = getFirstMessage(); + if (message == null) { return null; + } else { + if (!message.containsKey("timestamp")) { + return null; + } else { + return (Long) message.get("timestamp"); + } } - Map message = _message[0]; - if (!message.containsKey("timestamp")) { - return null; - } - return (Long) message.get("timestamp"); } @Override @@ -1562,7 +1566,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { ArrayList c = new ArrayList<>(); Filter thefilter = FilterImpl.createFilter(filter); - queue.flushExecutor(); final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit(); try (LinkedListIterator iterator = queue.browserIterator()) { @@ -1618,7 +1621,6 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { int currentPageSize = 0; ArrayList c = new ArrayList<>(); Filter thefilter = FilterImpl.createFilter(filter); - queue.flushExecutor(); try (LinkedListIterator iterator = queue.browserIterator()) { try { while (iterator.hasNext() && currentPageSize++ < limit) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 424e2e795b..3be9037fb9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -832,13 +832,16 @@ public final class PageSubscriptionImpl implements PageSubscription { @Override public void processReload() throws Exception { if (recoveredACK != null) { - logger.trace("********** processing reload!!!!!!!"); + if (logger.isDebugEnabled()) { + logger.debug("processing reload queue name={} with id={}", queue != null ? this.queue.getName() : "N/A", cursorId); + } Collections.sort(recoveredACK); long txDeleteCursorOnReload = -1; for (PagePosition pos : recoveredACK) { + logger.trace("reloading pos {}", pos); lastAckedPosition = pos; PageCursorInfo pageInfo = getPageInfo(pos); pageInfo.loadACK(pos); @@ -871,7 +874,7 @@ public final class PageSubscriptionImpl implements PageSubscription { @Override public void onDeletePage(Page deletedPage) throws Exception { - logger.trace("removing page {}", deletedPage); + logger.debug("removing page {}", deletedPage); PageCursorInfo info; synchronized (consumedPages) { info = consumedPages.remove(Long.valueOf(deletedPage.getPageId())); @@ -909,6 +912,12 @@ public final class PageSubscriptionImpl implements PageSubscription { return getPageInfo(pos.getPageNr()); } + public PageCursorInfo locatePageInfo(final long pageNr) { + synchronized (consumedPages) { + return consumedPages.get(pageNr); + } + } + public PageCursorInfo getPageInfo(final long pageNr) { synchronized (consumedPages) { PageCursorInfo pageInfo = consumedPages.get(pageNr); @@ -1278,11 +1287,20 @@ public final class PageSubscriptionImpl implements PageSubscription { private LinkedListIterator currentPageIterator; private void initPage(long page) { + if (logger.isDebugEnabled()) { + logger.debug("initPage {}", page); + } try { if (currentPage != null) { + if (logger.isTraceEnabled()) { + logger.trace("usage down {} on subscription {}", currentPage.getPageId(), cursorId); + } currentPage.usageDown(); } if (currentPageIterator != null) { + if (logger.isTraceEnabled()) { + logger.trace("closing pageIterator on {}", cursorId); + } currentPageIterator.close(); } currentPage = pageStore.usePage(page); @@ -1460,23 +1478,47 @@ public final class PageSubscriptionImpl implements PageSubscription { private PagedReference internalGetNext() { for (;;) { + assert currentPageIterator != null : "currentPageIterator is null"; PagedMessage message = currentPageIterator.hasNext() ? currentPageIterator.next() : null; logger.trace("CursorIterator::internalGetNext:: new reference {}", message); if (message != null) { return cursorProvider.newReference(message, PageSubscriptionImpl.this); } - if (currentPage.getPageId() < pageStore.getCurrentWritingPage()) { + if (logger.isTraceEnabled()) { + logger.trace("Current page {}", currentPage != null ? currentPage.getPageId() : null); + } + long nextPage = getNextPage(); + if (logger.isTraceEnabled()) { + logger.trace("next page {}", nextPage); + } + if (nextPage >= 0) { if (logger.isTraceEnabled()) { - logger.trace("CursorIterator::internalGetNext:: moving to currentPage {}", currentPage.getPageId() + 1); + logger.trace("CursorIterator::internalGetNext:: moving to currentPage {}", nextPage); } - initPage(currentPage.getPageId() + 1); + initPage(nextPage); } else { return null; } } } + private long getNextPage() { + long page = currentPage.getPageId() + 1; + + while (page <= pageStore.getCurrentWritingPage()) { + PageCursorInfo info = locatePageInfo(page); + if (info == null || info.getCompleteInfo() == null) { + return page; + } + if (logger.isDebugEnabled()) { + logger.debug("Subscription {} named {} moving faster from page {} to next", cursorId, queue.getName(), page); + } + page++; + } + return -1; + } + @Override public synchronized NextResult tryNext() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index bb307968bf..6a0551b430 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -150,9 +150,6 @@ public final class Page { } public synchronized LinkedList read(StorageManager storage, boolean onlyLargeMessages) throws Exception { - if (logger.isDebugEnabled()) { - logger.debug("reading page {} on address = {} onlyLargeMessages = {}", pageId, storeName, onlyLargeMessages); - } if (!file.isOpen()) { if (!file.exists()) { @@ -161,6 +158,12 @@ public final class Page { throw ActiveMQMessageBundle.BUNDLE.invalidPageIO(); } + if (logger.isTraceEnabled()) { + logger.trace("reading page {} on address = {} onlyLargeMessages = {}", pageId, storeName, onlyLargeMessages, new Exception("trace")); + } else if (logger.isDebugEnabled()) { + logger.debug("reading page {} on address = {} onlyLargeMessages = {}", pageId, storeName, onlyLargeMessages); + } + size = file.size(); final LinkedList messages = new LinkedListImpl<>(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index cc99a18a7a..81dcf4cbb9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -427,6 +427,10 @@ public interface Queue extends Bindable,CriticalComponent { } } + default MessageReference peekFirstMessage() { + return null; + } + LinkedListIterator browserIterator(); SimpleString getExpiryAddress(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 66d6ff789b..908bdca23e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1308,7 +1308,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private void deliverAsync(boolean noWait) { if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) { scheduledRunners.incrementAndGet(); - checkDepage(); try { getExecutor().execute(deliverRunner); } catch (RejectedExecutionException ignored) { @@ -1638,6 +1637,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return new QueueBrowserIterator(); } + @Override + public MessageReference peekFirstMessage() { + synchronized (this) { + if (messageReferences != null) { + return messageReferences.peek(); + } + } + + return null; + } + @Override public synchronized MessageReference removeReferenceWithID(final long id1) throws Exception { try (LinkedListIterator iterator = iterator()) { @@ -3164,6 +3174,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return; } if (pageIterator != null && pageSubscription.isPaging()) { + if (logger.isDebugEnabled()) { + logger.debug("CheckDepage on queue name {}, id={}", name, id); + } // we will issue a delivery runnable to check for released space from acks and resume depage pageDelivered = true; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/list/PriorityLinkedListTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/list/PriorityLinkedListTest.java index af46b68f73..a2bfaec2bb 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/list/PriorityLinkedListTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/list/PriorityLinkedListTest.java @@ -878,6 +878,21 @@ public final class PriorityLinkedListTest extends Assert { } + @Test + public void testPeek() { + assertNull(list.peek()); + + list.addTail(c, 5); + assertEquals(c, list.peek()); + + list.addTail(k, 0); + assertEquals(k, list.peek()); + + list.addHead(a, 0); + assertEquals(a, list.peek()); + } + + @Test public void testRemoveWithID() { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java index 47f3761f52..2245e8b02c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java @@ -459,6 +459,25 @@ public class LinkedListTest extends ActiveMQTestBase { } } + + @Test + public void testPeek() { + assertEquals(0, list.size()); + assertNull(list.peek()); + list.addTail(10); + assertEquals(10, (int)list.peek()); + assertEquals(10, (int)list.poll()); + assertNull(list.peek()); + list.addTail(12); + assertEquals(12, (int)list.peek()); + list.addHead(5); + assertEquals(5, (int)list.peek()); + list.poll(); + assertEquals(12, (int)list.peek()); + list.poll(); + assertNull(list.peek()); + } + @Test public void testAddHead() { int num = 10;