From 148909357f2d56a27fa8e56130da7ee35d504013 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 23 May 2013 18:33:06 +0000 Subject: [PATCH] related to AMQ-4296 : Fixes leveldb store cursoring. It was recovering too many messages and sometimes not the right messages. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1485810 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/leveldb/DBManager.scala | 19 +++++++++++++------ .../activemq/leveldb/LevelDBStore.scala | 17 ++++++++++------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 97d455dc3b..500a739ff2 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -667,17 +667,24 @@ class DBManager(val parent:LevelDBStore) { } def cursorMessages(key:Long, listener:MessageRecoveryListener, startPos:Long) = { - var nextPos = startPos; - client.queueCursor(key, nextPos) { msg => + var lastmsgid:MessageId = null + client.queueCursor(key, startPos) { msg => if( listener.hasSpace ) { - listener.recoverMessage(msg) - nextPos += 1 - true + if( listener.recoverMessage(msg) ) { + lastmsgid = msg.getMessageId + true + } else { + false + } } else { false } } - nextPos + if( lastmsgid==null ) { + startPos + } else { + lastmsgid.getEntryLocator.asInstanceOf[EntryLocator].seq+1 + } } def getXAActions(key:Long) = { diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 8e07b73851..2015c9948a 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -678,24 +678,26 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def recover(listener: MessageRecoveryListener): Unit = { - cursorPosition = db.cursorMessages(key, preparedExcluding(listener), 0) + cursorPosition = db.cursorMessages(key, PreparedExcluding(listener), 0) } - def preparedExcluding(listener: MessageRecoveryListener) = new MessageRecoveryListener { + case class PreparedExcluding(listener: MessageRecoveryListener) extends MessageRecoveryListener { def isDuplicate(ref: MessageId) = listener.isDuplicate(ref) def hasSpace = listener.hasSpace def recoverMessageReference(ref: MessageId) = { if (!preparedAcks.contains(ref)) { listener.recoverMessageReference(ref) + } else { + true } - true } def recoverMessage(message: Message) = { if (!preparedAcks.contains(message.getMessageId)) { listener.recoverMessage(message) + } else { + true } - true } } @@ -704,7 +706,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = { - cursorPosition = db.cursorMessages(key, preparedExcluding(LimitingRecoveryListener(maxReturned, listener)), cursorPosition) + val excluding = PreparedExcluding(LimitingRecoveryListener(maxReturned, listener)) + cursorPosition = db.cursorMessages(key, excluding, cursorPosition) } override def setBatch(id: MessageId): Unit = { @@ -714,7 +717,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends MessageRecoveryListener { - private var recovered: Int = 0 + var recovered: Int = 0 def hasSpace = recovered < max def recoverMessage(message: Message) = { recovered += 1; @@ -849,7 +852,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = { lookup(clientId, subscriptionName).foreach { sub => - sub.cursorPosition = db.cursorMessages(key, preparedExcluding(LimitingRecoveryListener(maxReturned, listener)), sub.cursorPosition.max(sub.lastAckPosition+1)) + sub.cursorPosition = db.cursorMessages(key, PreparedExcluding(LimitingRecoveryListener(maxReturned, listener)), sub.cursorPosition.max(sub.lastAckPosition+1)) } }