mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5266 - fix regression in levedlb xa recovery and durable sub scan - org.apache.activemq.broker.mLevelDBXARecoveryBrokerTest and org.apache.activemq.bugs.AMQ2149LevelDBTest
This commit is contained in:
parent
1948fe41a3
commit
350889c1eb
|
@ -686,7 +686,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
||||||
val sequence = lastSeq.synchronized {
|
val sequence = lastSeq.synchronized {
|
||||||
val seq = lastSeq.incrementAndGet()
|
val seq = lastSeq.incrementAndGet()
|
||||||
message.getMessageId.setFutureOrSequenceLong(seq);
|
message.getMessageId.setFutureOrSequenceLong(seq);
|
||||||
if (indexListener != null) {
|
// null context on xa recovery, we want to bypass the cursor & pending adds as it will be reset
|
||||||
|
if (indexListener != null && context != null) {
|
||||||
pendingCursorAdds.synchronized { pendingCursorAdds.add(seq) }
|
pendingCursorAdds.synchronized { pendingCursorAdds.add(seq) }
|
||||||
indexListener.onAdd(new MessageContext(context, message, new Runnable {
|
indexListener.onAdd(new MessageContext(context, message, new Runnable {
|
||||||
def run(): Unit = pendingCursorAdds.synchronized { pendingCursorAdds.remove(seq) }
|
def run(): Unit = pendingCursorAdds.synchronized { pendingCursorAdds.remove(seq) }
|
||||||
|
@ -920,7 +921,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
||||||
def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
|
def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
|
||||||
check_running
|
check_running
|
||||||
lookup(clientId, subscriptionName).foreach { sub =>
|
lookup(clientId, subscriptionName).foreach { sub =>
|
||||||
sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1), maxReturned)
|
sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1), Long.MaxValue, maxReturned)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue