mirror of https://github.com/apache/activemq.git
related to AMQ-4296 : Fixes leveldb store cursoring. It was recovering too many messages and sometimes not the right messages.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1485810 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5b0749c4ba
commit
148909357f
|
@ -667,17 +667,24 @@ class DBManager(val parent:LevelDBStore) {
|
|||
}
|
||||
|
||||
def cursorMessages(key:Long, listener:MessageRecoveryListener, startPos:Long) = {
|
||||
var nextPos = startPos;
|
||||
client.queueCursor(key, nextPos) { msg =>
|
||||
var lastmsgid:MessageId = null
|
||||
client.queueCursor(key, startPos) { msg =>
|
||||
if( listener.hasSpace ) {
|
||||
listener.recoverMessage(msg)
|
||||
nextPos += 1
|
||||
true
|
||||
if( listener.recoverMessage(msg) ) {
|
||||
lastmsgid = msg.getMessageId
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
nextPos
|
||||
if( lastmsgid==null ) {
|
||||
startPos
|
||||
} else {
|
||||
lastmsgid.getEntryLocator.asInstanceOf[EntryLocator].seq+1
|
||||
}
|
||||
}
|
||||
|
||||
def getXAActions(key:Long) = {
|
||||
|
|
|
@ -678,24 +678,26 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
|||
}
|
||||
|
||||
def recover(listener: MessageRecoveryListener): Unit = {
|
||||
cursorPosition = db.cursorMessages(key, preparedExcluding(listener), 0)
|
||||
cursorPosition = db.cursorMessages(key, PreparedExcluding(listener), 0)
|
||||
}
|
||||
|
||||
def preparedExcluding(listener: MessageRecoveryListener) = new MessageRecoveryListener {
|
||||
case class PreparedExcluding(listener: MessageRecoveryListener) extends MessageRecoveryListener {
|
||||
def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
|
||||
def hasSpace = listener.hasSpace
|
||||
def recoverMessageReference(ref: MessageId) = {
|
||||
if (!preparedAcks.contains(ref)) {
|
||||
listener.recoverMessageReference(ref)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
def recoverMessage(message: Message) = {
|
||||
if (!preparedAcks.contains(message.getMessageId)) {
|
||||
listener.recoverMessage(message)
|
||||
} else {
|
||||
true
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -704,7 +706,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
|||
}
|
||||
|
||||
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
|
||||
cursorPosition = db.cursorMessages(key, preparedExcluding(LimitingRecoveryListener(maxReturned, listener)), cursorPosition)
|
||||
val excluding = PreparedExcluding(LimitingRecoveryListener(maxReturned, listener))
|
||||
cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
|
||||
}
|
||||
|
||||
override def setBatch(id: MessageId): Unit = {
|
||||
|
@ -714,7 +717,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
|||
}
|
||||
|
||||
case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends MessageRecoveryListener {
|
||||
private var recovered: Int = 0
|
||||
var recovered: Int = 0
|
||||
def hasSpace = recovered < max
|
||||
def recoverMessage(message: Message) = {
|
||||
recovered += 1;
|
||||
|
@ -849,7 +852,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
|||
|
||||
def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
|
||||
lookup(clientId, subscriptionName).foreach { sub =>
|
||||
sub.cursorPosition = db.cursorMessages(key, preparedExcluding(LimitingRecoveryListener(maxReturned, listener)), sub.cursorPosition.max(sub.lastAckPosition+1))
|
||||
sub.cursorPosition = db.cursorMessages(key, PreparedExcluding(LimitingRecoveryListener(maxReturned, listener)), sub.cursorPosition.max(sub.lastAckPosition+1))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue