mirror of https://github.com/apache/activemq.git
AMQ-9625 - Prevent queue messages from becoming stuck
Fixes a race condition bug that can lead to a message being missed on dispatch and stuck on a Queue until restart when caching and concurrentStoreAndDispatch are enabled on a Queue and the cache becomes disabled.
This commit is contained in:
parent
3400983a22
commit
7f218fe05d
|
@ -255,12 +255,20 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
disableCache = true;
|
disableCache = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (disableCache && isCacheEnabled()) {
|
// AMQ-9625 - use this.cacheEnabled directly because the method isCacheEnabled() is overriden
|
||||||
|
// to try to re-enable the cache which we don't want at this point as we already skipped
|
||||||
|
// adding it to the cache
|
||||||
|
if (disableCache && this.cacheEnabled) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
|
LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
|
||||||
}
|
}
|
||||||
syncWithStore(node.getMessage());
|
syncWithStore(node.getMessage());
|
||||||
setCacheEnabled(false);
|
setCacheEnabled(false);
|
||||||
|
} else if (!this.cacheEnabled) {
|
||||||
|
// AMQ-9625 - Verify and wait on previous in flight async messages here if another
|
||||||
|
// thread triggered the cache to be disabled
|
||||||
|
// see the waitForAsyncMessage() method and Jira for more info
|
||||||
|
waitForAsyncMessage(node.getMessage());
|
||||||
}
|
}
|
||||||
size++;
|
size++;
|
||||||
return true;
|
return true;
|
||||||
|
@ -319,6 +327,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AMQ-9625 - If we are disabling the cache and syncing the store then
|
||||||
|
// we need to wait for task to finish before updating the store batch
|
||||||
|
// see the waitForAsyncMessage() method and Jira for more info
|
||||||
|
waitForAsyncMessage(currentAdd);
|
||||||
|
|
||||||
MessageId candidate = lastCachedIds[ASYNC_ADD];
|
MessageId candidate = lastCachedIds[ASYNC_ADD];
|
||||||
if (candidate != null) {
|
if (candidate != null) {
|
||||||
// ensure we don't skip current possibly sync add b/c we waited on the future
|
// ensure we don't skip current possibly sync add b/c we waited on the future
|
||||||
|
@ -530,4 +543,38 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
public Subscription getSubscription() {
|
public Subscription getSubscription() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AMQ-9625 - If the cache is disabled check if we need to wait for an async message
|
||||||
|
// to finish its task because the message is not being added to the cache.
|
||||||
|
// Normally, async messages will only be used if the cache is enabled so most of the time
|
||||||
|
// this check should not find any async messages to wait on if the cache is disabled
|
||||||
|
// and is basically a noop.
|
||||||
|
//
|
||||||
|
// However, while messages are being published, if the memory limit is reached the first
|
||||||
|
// thread that is adding the message that reaches the limit will disable the cache.
|
||||||
|
// This means there will be 1 or more potentially outstanding in flight adds that are
|
||||||
|
// queued up as async writes to the store.
|
||||||
|
//
|
||||||
|
// If the cache is disabled, we need to wait for any async message tasks to be
|
||||||
|
// finished otherwise there is a chance of missing the messages on dispatch
|
||||||
|
// when the queue pages in the next batch because store writes will finish after
|
||||||
|
// the store cursor has already moved ahead leading to a stuck message.
|
||||||
|
private void waitForAsyncMessage(Message node) {
|
||||||
|
// Note: isRecievedByDFBridge() was repurposed to be used to mark messages that
|
||||||
|
// are added to the store as async
|
||||||
|
if (node.getMessage().isRecievedByDFBridge()) {
|
||||||
|
final Object futureOrLong = node.getMessageId().getFutureOrSequenceLong();
|
||||||
|
if (futureOrLong instanceof Future) {
|
||||||
|
try {
|
||||||
|
((Future<?>) futureOrLong).get();
|
||||||
|
} catch (Exception exceptionOk) {
|
||||||
|
// We don't care if we get an exception (cancelled, etc) we just want
|
||||||
|
// to ensure the task is finished and not pending.
|
||||||
|
} finally {
|
||||||
|
LOG.trace("{} - future finished inside waitForAsyncMessage {} {}", this,
|
||||||
|
node.getMessageId(), futureOrLong);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue