Fixes a ghost messages issue where the queue cursor goes out of sync /w the leveldb store when transactions are being used.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1490065 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-06-05 23:01:13 +00:00
parent a0cc2af059
commit 50e8795464
2 changed files with 27 additions and 66 deletions

View File

@ -331,9 +331,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
val s = size
if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
asyncCapacityUsed = s
manager.parent.blocking_executor.execute(^{
complete_listeners.foreach(_())
})
complete_listeners.foreach(_())
} else {
manager.asyncCapacityRemaining.addAndGet(s)
}
@ -352,9 +350,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
asyncCapacityUsed = 0
} else {
manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
manager.parent.blocking_executor.execute(^{
complete_listeners.foreach(_())
})
complete_listeners.foreach(_())
}
countDownFuture.set(null)
@ -665,19 +661,15 @@ class DBManager(val parent:LevelDBStore) {
client.collectionIsEmpty(key)
}
def cursorMessages(key:Long, listener:MessageRecoveryListener, startPos:Long) = {
def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, max:Long=Long.MaxValue) = {
var lastmsgid:MessageId = null
var count = 0L
client.queueCursor(key, startPos) { msg =>
if( listener.hasSpace ) {
if( listener.recoverMessage(msg) ) {
lastmsgid = msg.getMessageId
true
} else {
false
}
} else {
false
if( !preparedAcks.contains(msg.getMessageId) && listener.recoverMessage(msg) ) {
lastmsgid = msg.getMessageId
count += 1
}
count < max
}
if( lastmsgid==null ) {
startPos

View File

@ -421,18 +421,23 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = {
preCommit.run()
transactions.remove(txid) match {
case null =>
postCommit.run()
case tx =>
val done = new CountDownLatch(1)
withUow { uow =>
for( action <- tx.commitActions ) {
action.commit(uow)
// Ugly synchronization hack to make sure messages are ordered the way the cursor expects them.
transactions.synchronized {
withUow { uow =>
for( action <- tx.commitActions ) {
action.commit(uow)
}
uow.syncFlag = true
uow.addCompleteListener {
preCommit.run()
done.countDown()
}
}
uow.syncFlag = true
uow.addCompleteListener { done.countDown() }
}
done.await()
if( tx.prepared ) {
@ -611,14 +616,15 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
case class LevelDBMessageStore(dest: ActiveMQDestination, val key: Long) extends AbstractMessageStore(dest) {
protected val lastSeq: AtomicLong = new AtomicLong(0)
val lastSeq: AtomicLong = new AtomicLong(0)
protected var cursorPosition: Long = 0
val preparedAcks = new HashSet[MessageId]()
lastSeq.set(db.getLastQueueEntrySeq(key))
def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
uow.enqueue(key, lastSeq.incrementAndGet, message, delay)
val seq = lastSeq.incrementAndGet()
uow.enqueue(key, seq, message, delay)
}
@ -680,27 +686,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def recover(listener: MessageRecoveryListener): Unit = {
cursorPosition = db.cursorMessages(key, PreparedExcluding(listener), 0)
}
case class PreparedExcluding(listener: MessageRecoveryListener) extends MessageRecoveryListener {
def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
def hasSpace = listener.hasSpace
def recoverMessageReference(ref: MessageId) = {
if (!preparedAcks.contains(ref)) {
listener.recoverMessageReference(ref)
} else {
true
}
}
def recoverMessage(message: Message) = {
if (!preparedAcks.contains(message.getMessageId)) {
listener.recoverMessage(message)
} else {
true
}
}
cursorPosition = db.cursorMessages(preparedAcks, key, listener, 0)
}
def resetBatching: Unit = {
@ -708,32 +694,15 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
val limiting = LimitingRecoveryListener(maxReturned, listener)
val excluding = PreparedExcluding(limiting)
cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, maxReturned)
}
override def setBatch(id: MessageId): Unit = {
cursorPosition = db.queuePosition(id)+1
cursorPosition = db.queuePosition(id)
}
}
case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends MessageRecoveryListener {
var recovered: Int = 0
def hasSpace = recovered < max
def recoverMessage(message: Message) = {
recovered += 1;
listener.recoverMessage(message)
}
def recoverMessageReference(ref: MessageId) = {
recovered += 1;
listener.recoverMessageReference(ref)
}
def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
}
//
// This gts called when the store is first loading up, it restores
// the existing durable subs..
@ -849,13 +818,13 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def recoverSubscription(clientId: String, subscriptionName: String, listener: MessageRecoveryListener): Unit = {
lookup(clientId, subscriptionName).foreach { sub =>
sub.cursorPosition = db.cursorMessages(key, listener, sub.cursorPosition.max(sub.lastAckPosition+1))
sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1))
}
}
def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
lookup(clientId, subscriptionName).foreach { sub =>
sub.cursorPosition = db.cursorMessages(key, PreparedExcluding(LimitingRecoveryListener(maxReturned, listener)), sub.cursorPosition.max(sub.lastAckPosition+1))
sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1), maxReturned)
}
}