diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 71a83acaef..30089e3551 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -255,12 +255,20 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i disableCache = true; } - if (disableCache && isCacheEnabled()) { + // AMQ-9625 - use this.cacheEnabled directly because the method isCacheEnabled() is overriden + // to try to re-enable the cache which we don't want at this point as we already skipped + // adding it to the cache + if (disableCache && this.cacheEnabled) { if (LOG.isTraceEnabled()) { LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); } syncWithStore(node.getMessage()); setCacheEnabled(false); + } else if (!this.cacheEnabled) { + // AMQ-9625 - Verify and wait on previous in flight async messages here if another + // thread triggered the cache to be disabled + // see the waitForAsyncMessage() method and Jira for more info + waitForAsyncMessage(node.getMessage()); } size++; return true; @@ -319,6 +327,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i break; } + // AMQ-9625 - If we are disabling the cache and syncing the store then + // we need to wait for task to finish before updating the store batch + // see the waitForAsyncMessage() method and Jira for more info + waitForAsyncMessage(currentAdd); + MessageId candidate = lastCachedIds[ASYNC_ADD]; if (candidate != null) { // ensure we don't skip current possibly sync add b/c we waited on the future @@ -530,4 +543,38 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public Subscription getSubscription() { return null; } + + // AMQ-9625 - If the cache is disabled check if we need to wait for an async message + // to finish its task because the message is not being added to the cache. + // Normally, async messages will only be used if the cache is enabled so most of the time + // this check should not find any async messages to wait on if the cache is disabled + // and is basically a noop. + // + // However, while messages are being published, if the memory limit is reached the first + // thread that is adding the message that reaches the limit will disable the cache. + // This means there will be 1 or more potentially outstanding in flight adds that are + // queued up as async writes to the store. + // + // If the cache is disabled, we need to wait for any async message tasks to be + // finished otherwise there is a chance of missing the messages on dispatch + // when the queue pages in the next batch because store writes will finish after + // the store cursor has already moved ahead leading to a stuck message. + private void waitForAsyncMessage(Message node) { + // Note: isRecievedByDFBridge() was repurposed to be used to mark messages that + // are added to the store as async + if (node.getMessage().isRecievedByDFBridge()) { + final Object futureOrLong = node.getMessageId().getFutureOrSequenceLong(); + if (futureOrLong instanceof Future) { + try { + ((Future) futureOrLong).get(); + } catch (Exception exceptionOk) { + // We don't care if we get an exception (cancelled, etc) we just want + // to ensure the task is finished and not pending. + } finally { + LOG.trace("{} - future finished inside waitForAsyncMessage {} {}", this, + node.getMessageId(), futureOrLong); + } + } + } + } }