From 350889c1eb3bdb55335721649dda02780f251674 Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 24 Oct 2014 14:23:06 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5266 - fix regression in levedlb xa recovery and durable sub scan - org.apache.activemq.broker.mLevelDBXARecoveryBrokerTest and org.apache.activemq.bugs.AMQ2149LevelDBTest --- .../scala/org/apache/activemq/leveldb/LevelDBStore.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 52a785a9f2..f86e05be01 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -686,7 +686,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P val sequence = lastSeq.synchronized { val seq = lastSeq.incrementAndGet() message.getMessageId.setFutureOrSequenceLong(seq); - if (indexListener != null) { + // null context on xa recovery, we want to bypass the cursor & pending adds as it will be reset + if (indexListener != null && context != null) { pendingCursorAdds.synchronized { pendingCursorAdds.add(seq) } indexListener.onAdd(new MessageContext(context, message, new Runnable { def run(): Unit = pendingCursorAdds.synchronized { pendingCursorAdds.remove(seq) } @@ -920,7 +921,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = { check_running lookup(clientId, subscriptionName).foreach { sub => - sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1), maxReturned) + sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1), Long.MaxValue, maxReturned) } }