ARTEMIS-2538 Removing all messages from a huge queue causes OOM

The PageSubscriptionImpl.cleanupEntries could be locked by the queue
depage because they are executed with the same executor and the depage
could be locked by the iterQueue.
If PageSubscriptionImpl.cleanupEntries is locked, no one clean up the
JournalRecord and PagePositionImpl instances created during iterQueue.
So removing all messages from a huge queue, causes the retention of too
JournalRecord and PagePositionImpl instances until an OOM.
To avoid to lock the PageSubscriptionImpl.cleanupEntries the depage is
executed only if the queue isn't iterating.
This commit is contained in:
brusdev 2019-11-15 18:17:45 +01:00 committed by Clebert Suconic
parent 8969045f92
commit 9946d8e63c
1 changed files with 46 additions and 41 deletions

View File

@ -2969,59 +2969,67 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private void depage(final boolean scheduleExpiry) {
depagePending = false;
synchronized (this) {
if (isPaused() || pageIterator == null) {
return;
}
if (!depageLock.tryLock()) {
return;
}
int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
if (logger.isTraceEnabled()) {
logger.trace("QueueMemorySize before depage on queue=" + this.getName() + " is " + queueMemorySize.get());
}
this.directDeliver = false;
int depaged = 0;
while (timeout - System.nanoTime() > 0 && needsDepage()) {
int status = pageIterator.tryNext();
if (status == 2) {
continue;
} else if (status == 0) {
break;
try {
synchronized (this) {
if (isPaused() || pageIterator == null) {
return;
}
}
depaged++;
PagedReference reference = pageIterator.next();
int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
if (logger.isTraceEnabled()) {
logger.trace("Depaging reference " + reference + " on queue " + this.getName());
logger.trace("QueueMemorySize before depage on queue=" + this.getName() + " is " + queueMemorySize.get());
}
addTail(reference, false);
pageIterator.remove();
//We have to increment this here instead of in the iterator so we have access to the reference from next()
pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
}
this.directDeliver = false;
if (logger.isDebugEnabled()) {
if (depaged == 0 && queueMemorySize.get() >= maxSize) {
logger.debug("Couldn't depage any message as the maxSize on the queue was achieved. " + "There are too many pending messages to be acked in reference to the page configuration");
int depaged = 0;
while (timeout - System.nanoTime() > 0 && needsDepage()) {
int status = pageIterator.tryNext();
if (status == 2) {
continue;
} else if (status == 0) {
break;
}
depaged++;
PagedReference reference = pageIterator.next();
if (logger.isTraceEnabled()) {
logger.trace("Depaging reference " + reference + " on queue " + this.getName());
}
addTail(reference, false);
pageIterator.remove();
//We have to increment this here instead of in the iterator so we have access to the reference from next()
pageSubscription.incrementDeliveredSize(getPersistentSize(reference));
}
if (logger.isDebugEnabled()) {
logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringMetrics.getMessageCount());
if (depaged == 0 && queueMemorySize.get() >= maxSize) {
logger.debug("Couldn't depage any message as the maxSize on the queue was achieved. " + "There are too many pending messages to be acked in reference to the page configuration");
}
if (logger.isDebugEnabled()) {
logger.debug("Queue Memory Size after depage on queue=" + this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + ", queueDelivering=" + deliveringMetrics.getMessageCount());
}
}
}
deliverAsync(true);
deliverAsync(true);
if (depaged > 0 && scheduleExpiry) {
// This will just call an executor
expireReferences();
if (depaged > 0 && scheduleExpiry) {
// This will just call an executor
expireReferences();
}
} finally {
depageLock.unlock();
}
}
@ -3888,13 +3896,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void run() {
depageLock.lock();
try {
depage(scheduleExpiry);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorDelivering(e);
} finally {
depageLock.unlock();
}
}
}