diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 0cf0157be7..fee0051247 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -331,9 +331,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { val s = size if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) { asyncCapacityUsed = s - manager.parent.blocking_executor.execute(^{ - complete_listeners.foreach(_()) - }) + complete_listeners.foreach(_()) } else { manager.asyncCapacityRemaining.addAndGet(s) } @@ -352,9 +350,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { asyncCapacityUsed = 0 } else { manager.uow_complete_latency.add(System.nanoTime() - disposed_at) - manager.parent.blocking_executor.execute(^{ - complete_listeners.foreach(_()) - }) + complete_listeners.foreach(_()) } countDownFuture.set(null) @@ -665,19 +661,15 @@ class DBManager(val parent:LevelDBStore) { client.collectionIsEmpty(key) } - def cursorMessages(key:Long, listener:MessageRecoveryListener, startPos:Long) = { + def cursorMessages(preparedAcks:java.util.HashSet[MessageId], key:Long, listener:MessageRecoveryListener, startPos:Long, max:Long=Long.MaxValue) = { var lastmsgid:MessageId = null + var count = 0L client.queueCursor(key, startPos) { msg => - if( listener.hasSpace ) { - if( listener.recoverMessage(msg) ) { - lastmsgid = msg.getMessageId - true - } else { - false - } - } else { - false + if( !preparedAcks.contains(msg.getMessageId) && listener.recoverMessage(msg) ) { + lastmsgid = msg.getMessageId + count += 1 } + count < max } if( lastmsgid==null ) { startPos diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 7ba400b5ca..1d92ba08ed 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -421,18 +421,23 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def commit(txid: TransactionId, wasPrepared: Boolean, preCommit: Runnable, postCommit: Runnable) = { - preCommit.run() transactions.remove(txid) match { case null => postCommit.run() case tx => val done = new CountDownLatch(1) - withUow { uow => - for( action <- tx.commitActions ) { - action.commit(uow) + // Ugly synchronization hack to make sure messages are ordered the way the cursor expects them. + transactions.synchronized { + withUow { uow => + for( action <- tx.commitActions ) { + action.commit(uow) + } + uow.syncFlag = true + uow.addCompleteListener { + preCommit.run() + done.countDown() + } } - uow.syncFlag = true - uow.addCompleteListener { done.countDown() } } done.await() if( tx.prepared ) { @@ -611,14 +616,15 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P case class LevelDBMessageStore(dest: ActiveMQDestination, val key: Long) extends AbstractMessageStore(dest) { - protected val lastSeq: AtomicLong = new AtomicLong(0) + val lastSeq: AtomicLong = new AtomicLong(0) protected var cursorPosition: Long = 0 val preparedAcks = new HashSet[MessageId]() lastSeq.set(db.getLastQueueEntrySeq(key)) def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = { - uow.enqueue(key, lastSeq.incrementAndGet, message, delay) + val seq = lastSeq.incrementAndGet() + uow.enqueue(key, seq, message, delay) } @@ -680,27 +686,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def recover(listener: MessageRecoveryListener): Unit = { - cursorPosition = db.cursorMessages(key, PreparedExcluding(listener), 0) - } - - case class PreparedExcluding(listener: MessageRecoveryListener) extends MessageRecoveryListener { - def isDuplicate(ref: MessageId) = listener.isDuplicate(ref) - def hasSpace = listener.hasSpace - def recoverMessageReference(ref: MessageId) = { - if (!preparedAcks.contains(ref)) { - listener.recoverMessageReference(ref) - } else { - true - } - } - - def recoverMessage(message: Message) = { - if (!preparedAcks.contains(message.getMessageId)) { - listener.recoverMessage(message) - } else { - true - } - } + cursorPosition = db.cursorMessages(preparedAcks, key, listener, 0) } def resetBatching: Unit = { @@ -708,32 +694,15 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = { - val limiting = LimitingRecoveryListener(maxReturned, listener) - val excluding = PreparedExcluding(limiting) - cursorPosition = db.cursorMessages(key, excluding, cursorPosition) + cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, maxReturned) } override def setBatch(id: MessageId): Unit = { - cursorPosition = db.queuePosition(id)+1 + cursorPosition = db.queuePosition(id) } } - case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends MessageRecoveryListener { - var recovered: Int = 0 - def hasSpace = recovered < max - def recoverMessage(message: Message) = { - recovered += 1; - listener.recoverMessage(message) - } - def recoverMessageReference(ref: MessageId) = { - recovered += 1; - listener.recoverMessageReference(ref) - } - def isDuplicate(ref: MessageId) = listener.isDuplicate(ref) - } - - // // This gts called when the store is first loading up, it restores // the existing durable subs.. @@ -849,13 +818,13 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def recoverSubscription(clientId: String, subscriptionName: String, listener: MessageRecoveryListener): Unit = { lookup(clientId, subscriptionName).foreach { sub => - sub.cursorPosition = db.cursorMessages(key, listener, sub.cursorPosition.max(sub.lastAckPosition+1)) + sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1)) } } def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = { lookup(clientId, subscriptionName).foreach { sub => - sub.cursorPosition = db.cursorMessages(key, PreparedExcluding(LimitingRecoveryListener(maxReturned, listener)), sub.cursorPosition.max(sub.lastAckPosition+1)) + sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1), maxReturned) } }