This commit is contained in:
Clebert Suconic 2019-10-12 15:36:05 -04:00
commit 714e31babf
4 changed files with 66 additions and 29 deletions

View File

@ -22,4 +22,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
public interface PageIterator extends LinkedListIterator<PagedReference> {
void redeliver(PagePosition reference);
// return 0 if no elements, 1 if having more elements, 2 if taking too long to find
int tryNext();
}

View File

@ -55,7 +55,7 @@ public interface PageSubscription {
*/
boolean isPaging();
LinkedListIterator<PagedReference> iterator();
PageIterator iterator();
LinkedListIterator<PagedReference> iterator(boolean jumpRemoves);

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -61,10 +62,14 @@ import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.server.impl.QueueImpl.DELIVERY_TIMEOUT;
public final class PageSubscriptionImpl implements PageSubscription {
private static final Logger logger = Logger.getLogger(PageSubscriptionImpl.class);
private static final PagedReference dummyPagedRef = new PagedReferenceImpl(null, null, null);
private boolean empty = true;
// Number of scheduled cleanups, to avoid too many schedules
@ -1323,7 +1328,13 @@ public final class PageSubscriptionImpl implements PageSubscription {
PagePositionAndFileOffset lastPosition = position;
PagePositionAndFileOffset tmpPosition = position;
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
do {
if (System.nanoTime() - timeout > 0) {
return dummyPagedRef;
}
synchronized (redeliveries) {
PagePosition redelivery = redeliveries.poll();
@ -1363,6 +1374,8 @@ public final class PageSubscriptionImpl implements PageSubscription {
PageCursorInfo info = getPageInfo(message.getPosition().getPageNr());
position = tmpPosition;
if (!browsing && info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
continue;
}
@ -1398,8 +1411,6 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
}
position = tmpPosition;
if (valid) {
match = match(message.getMessage());
@ -1420,24 +1431,36 @@ public final class PageSubscriptionImpl implements PageSubscription {
}
}
@Override
public synchronized int tryNext() {
// if an unbehaved program called hasNext twice before next, we only cache it once.
if (cachedNext != null) {
return 1;
}
if (!pageStore.isPaging()) {
return 0;
}
PagedReference pagedReference = next();
if (pagedReference == dummyPagedRef) {
return 2;
} else {
cachedNext = pagedReference;
return cachedNext == null ? 0 : 1;
}
}
/**
* QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well.
* It would be a rare race condition but I would prefer avoiding that scenario
*/
@Override
public synchronized boolean hasNext() {
// if an unbehaved program called hasNext twice before next, we only cache it once.
if (cachedNext != null) {
return true;
int status;
while ((status = tryNext()) == 2) {
}
if (!pageStore.isPaging()) {
return false;
}
cachedNext = next();
return cachedNext != null;
return status == 0 ? false : true;
}
@Override

View File

@ -56,6 +56,7 @@ import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.PriorityAware;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.cursor.PageIterator;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
@ -173,7 +174,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private ReferenceCounter refCountForConsumers;
private final LinkedListIterator<PagedReference> pageIterator;
private final PageIterator pageIterator;
private volatile boolean printErrorExpiring = false;
@ -1123,9 +1124,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void deliverAsync() {
deliverAsync(false);
}
private void deliverAsync(boolean noWait) {
if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) {
scheduledRunners.incrementAndGet();
checkDepage();
checkDepage(noWait);
try {
getExecutor().execute(deliverRunner);
} catch (RejectedExecutionException ignored) {
@ -1133,7 +1138,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
scheduledRunners.decrementAndGet();
}
}
}
@Override
@ -2279,7 +2283,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
// If empty we need to schedule depaging to make sure we would depage expired messages as well
if ((!hasElements || expired) && pageIterator != null && pageIterator.hasNext()) {
if ((!hasElements || expired) && pageIterator != null && pageIterator.tryNext() > 0) {
scheduleDepage(true);
}
}
@ -2656,7 +2660,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (added++ > MAX_DELIVERIES_IN_LOOP) {
// if we just keep polling from the intermediate we could starve in case there's a sustained load
deliverAsync();
deliverAsync(true);
return;
}
}
@ -2680,24 +2684,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
int handled = 0;
long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
consumers.reset();
while (true) {
if (handled == MAX_DELIVERIES_IN_LOOP) {
// Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
// long
deliverAsync();
deliverAsync(true);
return false;
}
if (System.currentTimeMillis() > timeout) {
if (System.nanoTime() - timeout > 0) {
if (logger.isTraceEnabled()) {
logger.trace("delivery has been running for too long. Scheduling another delivery task now");
}
deliverAsync();
deliverAsync(true);
return false;
}
@ -2841,8 +2845,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
refRemoved(ref);
}
private void checkDepage() {
if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.hasNext()) {
private void checkDepage(boolean noWait) {
if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && (noWait ? pageIterator.tryNext() > 0 : pageIterator.hasNext())) {
scheduleDepage(false);
}
}
@ -2933,7 +2937,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
if (logger.isTraceEnabled()) {
logger.trace("QueueMemorySize before depage on queue=" + this.getName() + " is " + queueMemorySize.get());
@ -2942,7 +2946,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.directDeliver = false;
int depaged = 0;
while (timeout > System.currentTimeMillis() && needsDepage() && pageIterator.hasNext()) {
while (timeout - System.nanoTime() > 0 && needsDepage()) {
int status = pageIterator.tryNext();
if (status == 2) {
continue;
} else if (status == 0) {
break;
}
depaged++;
PagedReference reference = pageIterator.next();
if (logger.isTraceEnabled()) {
@ -2966,7 +2977,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
deliverAsync();
deliverAsync(true);
if (depaged > 0 && scheduleExpiry) {
// This will just call an executor
@ -3815,7 +3826,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (needCheckDepage) {
enterCritical(CRITICAL_CHECK_DEPAGE);
try {
checkDepage();
checkDepage(true);
} finally {
leaveCritical(CRITICAL_CHECK_DEPAGE);
}