ARTEMIS-2515 pageIterator.hasNext spends too much time in the case of no messages matched
This commit is contained in:
parent
149e26075b
commit
cb355bb584
|
@ -22,4 +22,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
||||||
public interface PageIterator extends LinkedListIterator<PagedReference> {
|
public interface PageIterator extends LinkedListIterator<PagedReference> {
|
||||||
|
|
||||||
void redeliver(PagePosition reference);
|
void redeliver(PagePosition reference);
|
||||||
|
|
||||||
|
// return 0 if no elements, 1 if having more elements, 2 if taking too long to find
|
||||||
|
int tryNext();
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,7 +55,7 @@ public interface PageSubscription {
|
||||||
*/
|
*/
|
||||||
boolean isPaging();
|
boolean isPaging();
|
||||||
|
|
||||||
LinkedListIterator<PagedReference> iterator();
|
PageIterator iterator();
|
||||||
|
|
||||||
LinkedListIterator<PagedReference> iterator(boolean jumpRemoves);
|
LinkedListIterator<PagedReference> iterator(boolean jumpRemoves);
|
||||||
|
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
|
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
|
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
|
||||||
|
import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
|
||||||
import org.apache.activemq.artemis.core.paging.impl.Page;
|
import org.apache.activemq.artemis.core.paging.impl.Page;
|
||||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
@ -61,10 +62,14 @@ import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||||
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
|
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.core.server.impl.QueueImpl.DELIVERY_TIMEOUT;
|
||||||
|
|
||||||
public final class PageSubscriptionImpl implements PageSubscription {
|
public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(PageSubscriptionImpl.class);
|
private static final Logger logger = Logger.getLogger(PageSubscriptionImpl.class);
|
||||||
|
|
||||||
|
private static final PagedReference dummyPagedRef = new PagedReferenceImpl(null, null, null);
|
||||||
|
|
||||||
private boolean empty = true;
|
private boolean empty = true;
|
||||||
|
|
||||||
// Number of scheduled cleanups, to avoid too many schedules
|
// Number of scheduled cleanups, to avoid too many schedules
|
||||||
|
@ -1323,7 +1328,13 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
PagePositionAndFileOffset lastPosition = position;
|
PagePositionAndFileOffset lastPosition = position;
|
||||||
PagePositionAndFileOffset tmpPosition = position;
|
PagePositionAndFileOffset tmpPosition = position;
|
||||||
|
|
||||||
|
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
if (System.nanoTime() - timeout > 0) {
|
||||||
|
return dummyPagedRef;
|
||||||
|
}
|
||||||
|
|
||||||
synchronized (redeliveries) {
|
synchronized (redeliveries) {
|
||||||
PagePosition redelivery = redeliveries.poll();
|
PagePosition redelivery = redeliveries.poll();
|
||||||
|
|
||||||
|
@ -1363,6 +1374,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
|
|
||||||
PageCursorInfo info = getPageInfo(message.getPosition().getPageNr());
|
PageCursorInfo info = getPageInfo(message.getPosition().getPageNr());
|
||||||
|
|
||||||
|
position = tmpPosition;
|
||||||
|
|
||||||
if (!browsing && info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
|
if (!browsing && info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1398,8 +1411,6 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
position = tmpPosition;
|
|
||||||
|
|
||||||
if (valid) {
|
if (valid) {
|
||||||
match = match(message.getMessage());
|
match = match(message.getMessage());
|
||||||
|
|
||||||
|
@ -1420,24 +1431,36 @@ public final class PageSubscriptionImpl implements PageSubscription {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized int tryNext() {
|
||||||
|
// if an unbehaved program called hasNext twice before next, we only cache it once.
|
||||||
|
if (cachedNext != null) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pageStore.isPaging()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
PagedReference pagedReference = next();
|
||||||
|
if (pagedReference == dummyPagedRef) {
|
||||||
|
return 2;
|
||||||
|
} else {
|
||||||
|
cachedNext = pagedReference;
|
||||||
|
return cachedNext == null ? 0 : 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well.
|
* QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well.
|
||||||
* It would be a rare race condition but I would prefer avoiding that scenario
|
* It would be a rare race condition but I would prefer avoiding that scenario
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean hasNext() {
|
public synchronized boolean hasNext() {
|
||||||
// if an unbehaved program called hasNext twice before next, we only cache it once.
|
int status;
|
||||||
if (cachedNext != null) {
|
while ((status = tryNext()) == 2) {
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
return status == 0 ? false : true;
|
||||||
if (!pageStore.isPaging()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
cachedNext = next();
|
|
||||||
|
|
||||||
return cachedNext != null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||||
import org.apache.activemq.artemis.core.PriorityAware;
|
import org.apache.activemq.artemis.core.PriorityAware;
|
||||||
import org.apache.activemq.artemis.core.filter.Filter;
|
import org.apache.activemq.artemis.core.filter.Filter;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
|
import org.apache.activemq.artemis.core.paging.cursor.PageIterator;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
|
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
|
||||||
|
@ -173,7 +174,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
private ReferenceCounter refCountForConsumers;
|
private ReferenceCounter refCountForConsumers;
|
||||||
|
|
||||||
private final LinkedListIterator<PagedReference> pageIterator;
|
private final PageIterator pageIterator;
|
||||||
|
|
||||||
private volatile boolean printErrorExpiring = false;
|
private volatile boolean printErrorExpiring = false;
|
||||||
|
|
||||||
|
@ -1123,9 +1124,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deliverAsync() {
|
public void deliverAsync() {
|
||||||
|
deliverAsync(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deliverAsync(boolean noWait) {
|
||||||
if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) {
|
if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) {
|
||||||
scheduledRunners.incrementAndGet();
|
scheduledRunners.incrementAndGet();
|
||||||
checkDepage();
|
checkDepage(noWait);
|
||||||
try {
|
try {
|
||||||
getExecutor().execute(deliverRunner);
|
getExecutor().execute(deliverRunner);
|
||||||
} catch (RejectedExecutionException ignored) {
|
} catch (RejectedExecutionException ignored) {
|
||||||
|
@ -1133,7 +1138,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
scheduledRunners.decrementAndGet();
|
scheduledRunners.decrementAndGet();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2279,7 +2283,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
// If empty we need to schedule depaging to make sure we would depage expired messages as well
|
// If empty we need to schedule depaging to make sure we would depage expired messages as well
|
||||||
if ((!hasElements || expired) && pageIterator != null && pageIterator.hasNext()) {
|
if ((!hasElements || expired) && pageIterator != null && pageIterator.tryNext() > 0) {
|
||||||
scheduleDepage(true);
|
scheduleDepage(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2656,7 +2660,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
if (added++ > MAX_DELIVERIES_IN_LOOP) {
|
if (added++ > MAX_DELIVERIES_IN_LOOP) {
|
||||||
// if we just keep polling from the intermediate we could starve in case there's a sustained load
|
// if we just keep polling from the intermediate we could starve in case there's a sustained load
|
||||||
deliverAsync();
|
deliverAsync(true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2680,24 +2684,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
int handled = 0;
|
int handled = 0;
|
||||||
|
|
||||||
long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
|
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
|
||||||
consumers.reset();
|
consumers.reset();
|
||||||
while (true) {
|
while (true) {
|
||||||
if (handled == MAX_DELIVERIES_IN_LOOP) {
|
if (handled == MAX_DELIVERIES_IN_LOOP) {
|
||||||
// Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
|
// Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
|
||||||
// long
|
// long
|
||||||
|
|
||||||
deliverAsync();
|
deliverAsync(true);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (System.currentTimeMillis() > timeout) {
|
if (System.nanoTime() - timeout > 0) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("delivery has been running for too long. Scheduling another delivery task now");
|
logger.trace("delivery has been running for too long. Scheduling another delivery task now");
|
||||||
}
|
}
|
||||||
|
|
||||||
deliverAsync();
|
deliverAsync(true);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -2841,8 +2845,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
refRemoved(ref);
|
refRemoved(ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkDepage() {
|
private void checkDepage(boolean noWait) {
|
||||||
if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.hasNext()) {
|
if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && (noWait ? pageIterator.tryNext() > 0 : pageIterator.hasNext())) {
|
||||||
scheduleDepage(false);
|
scheduleDepage(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2933,7 +2937,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
|
int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
|
||||||
|
|
||||||
long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
|
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("QueueMemorySize before depage on queue=" + this.getName() + " is " + queueMemorySize.get());
|
logger.trace("QueueMemorySize before depage on queue=" + this.getName() + " is " + queueMemorySize.get());
|
||||||
|
@ -2942,7 +2946,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
this.directDeliver = false;
|
this.directDeliver = false;
|
||||||
|
|
||||||
int depaged = 0;
|
int depaged = 0;
|
||||||
while (timeout > System.currentTimeMillis() && needsDepage() && pageIterator.hasNext()) {
|
while (timeout - System.nanoTime() > 0 && needsDepage()) {
|
||||||
|
int status = pageIterator.tryNext();
|
||||||
|
if (status == 2) {
|
||||||
|
continue;
|
||||||
|
} else if (status == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
depaged++;
|
depaged++;
|
||||||
PagedReference reference = pageIterator.next();
|
PagedReference reference = pageIterator.next();
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
@ -2966,7 +2977,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
deliverAsync();
|
deliverAsync(true);
|
||||||
|
|
||||||
if (depaged > 0 && scheduleExpiry) {
|
if (depaged > 0 && scheduleExpiry) {
|
||||||
// This will just call an executor
|
// This will just call an executor
|
||||||
|
@ -3815,7 +3826,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
if (needCheckDepage) {
|
if (needCheckDepage) {
|
||||||
enterCritical(CRITICAL_CHECK_DEPAGE);
|
enterCritical(CRITICAL_CHECK_DEPAGE);
|
||||||
try {
|
try {
|
||||||
checkDepage();
|
checkDepage(true);
|
||||||
} finally {
|
} finally {
|
||||||
leaveCritical(CRITICAL_CHECK_DEPAGE);
|
leaveCritical(CRITICAL_CHECK_DEPAGE);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue