Fix for failing LveelDB unit tests where only non-persistent messages are sent in a TX. The preCommit wasn't being run so the Queue's orderIndexUpdates structure wasn't getting updated with the TX to process in the postCommit phase.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1501420 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-07-09 18:15:26 +00:00
parent cb52dd47f4
commit 5f0dd8ab53
1 changed files with 16 additions and 14 deletions

View File

@ -49,7 +49,7 @@ object LevelDBStore extends Log {
val DONE = new CountDownFuture[AnyRef]();
DONE.set(null)
def toIOException(e: Throwable): IOException = {
if (e.isInstanceOf[ExecutionException]) {
var cause: Throwable = (e.asInstanceOf[ExecutionException]).getCause
@ -143,7 +143,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
var directory = DEFAULT_DIRECTORY
@BeanProperty
var logDirectory: File = null
@BeanProperty
var logSize: Long = 1024 * 1024 * 100
@BeanProperty
@ -301,13 +301,13 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def createTransactionStore = this
val transactions = new ConcurrentHashMap[TransactionId, Transaction]()
trait TransactionAction {
def commit(uow:DelayableUOW):Unit
def prepare(uow:DelayableUOW):Unit
def rollback(uow:DelayableUOW):Unit
}
case class Transaction(id:TransactionId) {
val commitActions = ListBuffer[TransactionAction]()
@ -405,7 +405,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
}
}
def transaction(txid: TransactionId) = {
var rc = transactions.get(txid)
if( rc == null ) {
@ -417,10 +417,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
rc
}
def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = {
transactions.remove(txid) match {
case null =>
// Only in-flight non-persistent messages in this TX.
preCommit.run()
postCommit.run()
case tx =>
val done = new CountDownLatch(1)
@ -715,8 +717,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
db.removeSubscription(sub)
}
}
def getTopicGCPositions = {
import collection.JavaConversions._
val topics = this.synchronized {
@ -755,7 +757,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
}
}
def addSubsciption(info: SubscriptionInfo, retroactive: Boolean) = {
var sub = db.addSubscription(key, info)
subscriptions.synchronized {
@ -768,7 +770,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
uow.countDownFuture
})
}
def getAllSubscriptions: Array[SubscriptionInfo] = subscriptions.synchronized {
subscriptions.values.map(_.info).toArray
}
@ -808,7 +810,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
}
def resetBatching(clientId: String, subscriptionName: String): Unit = {
lookup(clientId, subscriptionName).foreach { sub =>
sub.cursorPosition = 0
@ -819,13 +821,13 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1))
}
}
def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = {
lookup(clientId, subscriptionName).foreach { sub =>
sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1), maxReturned)
}
}
def getMessageCount(clientId: String, subscriptionName: String): Int = {
lookup(clientId, subscriptionName) match {
case Some(sub) =>
@ -933,7 +935,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
///////////////////////////////////////////////////////////////////////////
// The following methods actually have nothing to do with JMS txs... It's more like
// operation batch.. we handle that in the DBManager tho..
// operation batch.. we handle that in the DBManager tho..
///////////////////////////////////////////////////////////////////////////
def beginTransaction(context: ConnectionContext): Unit = {}
def commitTransaction(context: ConnectionContext): Unit = {}