From cb355bb5847789449988dc4343532628389bc30e Mon Sep 17 00:00:00 2001 From: Wei Yang Date: Mon, 23 Sep 2019 19:50:40 +0800 Subject: [PATCH] ARTEMIS-2515 pageIterator.hasNext spends too much time in the case of no messages matched --- .../core/paging/cursor/PageIterator.java | 3 ++ .../core/paging/cursor/PageSubscription.java | 2 +- .../cursor/impl/PageSubscriptionImpl.java | 49 ++++++++++++++----- .../artemis/core/server/impl/QueueImpl.java | 41 ++++++++++------ 4 files changed, 66 insertions(+), 29 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageIterator.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageIterator.java index 173213361c..ce7ddb7caa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageIterator.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageIterator.java @@ -22,4 +22,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator; public interface PageIterator extends LinkedListIterator { void redeliver(PagePosition reference); + + // return 0 if no elements, 1 if having more elements, 2 if taking too long to find + int tryNext(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java index 33092d58cc..eb41e63aae 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java @@ -55,7 +55,7 @@ public interface PageSubscription { */ boolean isPaging(); - LinkedListIterator iterator(); + PageIterator iterator(); LinkedListIterator iterator(boolean jumpRemoves); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 7fee465268..423b588500 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; +import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -61,10 +62,14 @@ import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.core.server.impl.QueueImpl.DELIVERY_TIMEOUT; + public final class PageSubscriptionImpl implements PageSubscription { private static final Logger logger = Logger.getLogger(PageSubscriptionImpl.class); + private static final PagedReference dummyPagedRef = new PagedReferenceImpl(null, null, null); + private boolean empty = true; // Number of scheduled cleanups, to avoid too many schedules @@ -1323,7 +1328,13 @@ public final class PageSubscriptionImpl implements PageSubscription { PagePositionAndFileOffset lastPosition = position; PagePositionAndFileOffset tmpPosition = position; + long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT); + do { + if (System.nanoTime() - timeout > 0) { + return dummyPagedRef; + } + synchronized (redeliveries) { PagePosition redelivery = redeliveries.poll(); @@ -1363,6 +1374,8 @@ public final class PageSubscriptionImpl implements PageSubscription { PageCursorInfo info = getPageInfo(message.getPosition().getPageNr()); + position = tmpPosition; + if (!browsing && info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) { continue; } @@ -1398,8 +1411,6 @@ public final class PageSubscriptionImpl implements PageSubscription { } } - position = tmpPosition; - if (valid) { match = match(message.getMessage()); @@ -1420,24 +1431,36 @@ public final class PageSubscriptionImpl implements PageSubscription { } } + @Override + public synchronized int tryNext() { + // if an unbehaved program called hasNext twice before next, we only cache it once. + if (cachedNext != null) { + return 1; + } + + if (!pageStore.isPaging()) { + return 0; + } + + PagedReference pagedReference = next(); + if (pagedReference == dummyPagedRef) { + return 2; + } else { + cachedNext = pagedReference; + return cachedNext == null ? 0 : 1; + } + } + /** * QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well. * It would be a rare race condition but I would prefer avoiding that scenario */ @Override public synchronized boolean hasNext() { - // if an unbehaved program called hasNext twice before next, we only cache it once. - if (cachedNext != null) { - return true; + int status; + while ((status = tryNext()) == 2) { } - - if (!pageStore.isPaging()) { - return false; - } - - cachedNext = next(); - - return cachedNext != null; + return status == 0 ? false : true; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 881d398bb8..f21e7ac3b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -56,6 +56,7 @@ import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.PriorityAware; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.paging.cursor.PageIterator; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PagedReference; @@ -173,7 +174,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private ReferenceCounter refCountForConsumers; - private final LinkedListIterator pageIterator; + private final PageIterator pageIterator; private volatile boolean printErrorExpiring = false; @@ -1123,9 +1124,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void deliverAsync() { + deliverAsync(false); + } + + private void deliverAsync(boolean noWait) { if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) { scheduledRunners.incrementAndGet(); - checkDepage(); + checkDepage(noWait); try { getExecutor().execute(deliverRunner); } catch (RejectedExecutionException ignored) { @@ -1133,7 +1138,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { scheduledRunners.decrementAndGet(); } } - } @Override @@ -2279,7 +2283,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } // If empty we need to schedule depaging to make sure we would depage expired messages as well - if ((!hasElements || expired) && pageIterator != null && pageIterator.hasNext()) { + if ((!hasElements || expired) && pageIterator != null && pageIterator.tryNext() > 0) { scheduleDepage(true); } } @@ -2656,7 +2660,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (added++ > MAX_DELIVERIES_IN_LOOP) { // if we just keep polling from the intermediate we could starve in case there's a sustained load - deliverAsync(); + deliverAsync(true); return; } } @@ -2680,24 +2684,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { int handled = 0; - long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT; + long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT); consumers.reset(); while (true) { if (handled == MAX_DELIVERIES_IN_LOOP) { // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too // long - deliverAsync(); + deliverAsync(true); return false; } - if (System.currentTimeMillis() > timeout) { + if (System.nanoTime() - timeout > 0) { if (logger.isTraceEnabled()) { logger.trace("delivery has been running for too long. Scheduling another delivery task now"); } - deliverAsync(); + deliverAsync(true); return false; } @@ -2841,8 +2845,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { refRemoved(ref); } - private void checkDepage() { - if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.hasNext()) { + private void checkDepage(boolean noWait) { + if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && (noWait ? pageIterator.tryNext() > 0 : pageIterator.hasNext())) { scheduleDepage(false); } } @@ -2933,7 +2937,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { int maxSize = pageSubscription.getPagingStore().getPageSizeBytes(); - long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT; + long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT); if (logger.isTraceEnabled()) { logger.trace("QueueMemorySize before depage on queue=" + this.getName() + " is " + queueMemorySize.get()); @@ -2942,7 +2946,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.directDeliver = false; int depaged = 0; - while (timeout > System.currentTimeMillis() && needsDepage() && pageIterator.hasNext()) { + while (timeout - System.nanoTime() > 0 && needsDepage()) { + int status = pageIterator.tryNext(); + if (status == 2) { + continue; + } else if (status == 0) { + break; + } + depaged++; PagedReference reference = pageIterator.next(); if (logger.isTraceEnabled()) { @@ -2966,7 +2977,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - deliverAsync(); + deliverAsync(true); if (depaged > 0 && scheduleExpiry) { // This will just call an executor @@ -3815,7 +3826,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (needCheckDepage) { enterCritical(CRITICAL_CHECK_DEPAGE); try { - checkDepage(); + checkDepage(true); } finally { leaveCritical(CRITICAL_CHECK_DEPAGE); }