Additional fix for AMQ-4535: Seems store was getting out of sync /w cursor due to thread unsafe access in the leveldb store.

Switched to a concurrent HashMap to track the transaction since that will be getting accessed by multiple threads.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1483369 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-16 13:50:35 +00:00
parent 66c8f9c4b8
commit 086e9de9ac
1 changed files with 20 additions and 8 deletions

View File

@ -230,6 +230,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
db.loadCollections
// Finish recovering the prepared XA transactions.
import collection.JavaConversions._
for( (txid, transaction) <- transactions ) {
assert( transaction.xacontainer_id != -1 )
val (msgs, acks) = db.getXAActions(transaction.xacontainer_id)
@ -290,7 +291,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def createTransactionStore = this
val transactions = collection.mutable.HashMap[TransactionId, Transaction]()
val transactions = new ConcurrentHashMap[TransactionId, Transaction]()
trait TransactionAction {
def commit(uow:DelayableUOW):Unit
@ -395,14 +396,24 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
}
def transaction(txid: TransactionId) = transactions.getOrElseUpdate(txid, Transaction(txid))
def transaction(txid: TransactionId) = {
var rc = transactions.get(txid)
if( rc == null ) {
rc = Transaction(txid)
val prev = transactions.putIfAbsent(txid, rc)
if (prev!=null) {
rc = prev
}
}
rc
}
def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = {
preCommit.run()
transactions.remove(txid) match {
case None=>
case null =>
postCommit.run()
case Some(tx)=>
case tx =>
val done = new CountDownLatch(1)
withUow { uow =>
for( action <- tx.commitActions ) {
@ -421,9 +432,9 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def rollback(txid: TransactionId) = {
transactions.remove(txid) match {
case None=>
case null =>
println("The transaction does not exist")
case Some(tx)=>
case tx =>
if( tx.prepared ) {
val done = new CountDownLatch(1)
withUow { uow =>
@ -441,9 +452,9 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def prepare(tx: TransactionId) = {
transactions.get(tx) match {
case None=>
case null =>
println("The transaction does not exist")
case Some(tx)=>
case tx =>
tx.prepare
}
}
@ -452,6 +463,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def recover(listener: TransactionRecoveryListener) = {
this.doingRecover = true
try {
import collection.JavaConversions._
for ( (txid, transaction) <- transactions ) {
if( transaction.prepared ) {
val (msgs, acks) = transaction.xarecovery