This commit is contained in:
Clebert Suconic 2019-11-22 10:10:17 -05:00
commit 6c33a48c15
1 changed files with 46 additions and 41 deletions

View File

@ -2969,59 +2969,67 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private void depage(final boolean scheduleExpiry) {
depagePending = false;
synchronized (this) {
if (isPaused() || pageIterator == null) {
return;
}
if (!depageLock.tryLock()) {
return;
}
int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
if (logger.isTraceEnabled()) {
logger.trace("QueueMemorySize before depage on queue=" + this.getName() + " is " + queueMemorySize.get());
}
this.directDeliver = false;
int depaged = 0;
while (timeout - System.nanoTime() > 0 && needsDepage()) {
int status = pageIterator.tryNext();
if (status == 2) {
continue;
} else if (status == 0) {
break;
try {
synchronized (this) {
if (isPaused() || pageIterator == null) {
return;
}
}
depaged++;
PagedReference reference = pageIterator.next();
int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
if (logger.isTraceEnabled()) {
logger.trace("Depaging reference " + reference + " on queue " + this.getName());
logger.trace("QueueMemorySize before depage on queue=" + this.getName() + " is " + queueMemorySize.get());
}
addTail(reference, false);
pageIterator.remove();
//We have to increment this here instead of in the iterator so we have access to the reference from next()
pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
}
this.directDeliver = false;
if (logger.isDebugEnabled()) {
if (depaged == 0 && queueMemorySize.get() >= maxSize) {
logger.debug("Couldn't depage any message as the maxSize on the queue was achieved. " + "There are too many pending messages to be acked in reference to the page configuration");
int depaged = 0;
while (timeout - System.nanoTime() > 0 && needsDepage()) {
int status = pageIterator.tryNext();
if (status == 2) {
continue;
} else if (status == 0) {
break;
}
depaged++;
PagedReference reference = pageIterator.next();
if (logger.isTraceEnabled()) {
logger.trace("Depaging reference " + reference + " on queue " + this.getName());
}
addTail(reference, false);
pageIterator.remove();
//We have to increment this here instead of in the iterator so we have access to the reference from next()
pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
}
if (logger.isDebugEnabled()) {
logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringMetrics.getMessageCount());
if (depaged == 0 && queueMemorySize.get() >= maxSize) {
logger.debug("Couldn't depage any message as the maxSize on the queue was achieved. " + "There are too many pending messages to be acked in reference to the page configuration");
}
if (logger.isDebugEnabled()) {
logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringMetrics.getMessageCount());
}
}
}
deliverAsync(true);
deliverAsync(true);
if (depaged > 0 && scheduleExpiry) {
// This will just call an executor
expireReferences();
if (depaged > 0 && scheduleExpiry) {
// This will just call an executor
expireReferences();
}
} finally {
depageLock.unlock();
}
}
@ -3888,13 +3896,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void run() {
depageLock.lock();
try {
depage(scheduleExpiry);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorDelivering(e);
} finally {
depageLock.unlock();
}
}
}