mirror of https://github.com/apache/activemq.git
[LevelDB]
https://issues.apache.org/jira/browse/AMQ-4296 Fixes remainder of failing unit tests. The LevelDB wasn't incrementing or decrementing reference counts on messages added to the store which causes the expectations of certain memory limit based tests to fail as the memory usage was being updates after the store add instead of during so a message could get placed into the batch list of a cursor when we did not expect that it would. This could also cause a browse to return fewer message than we want as the in memory messages would top out the usage limit so we'd never page in one batch of messages. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1505805 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
20f384ce59
commit
194c6535cd
|
@ -624,10 +624,13 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
|||
|
||||
def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
|
||||
val seq = lastSeq.incrementAndGet()
|
||||
message.incrementReferenceCount()
|
||||
uow.addCompleteListener({
|
||||
message.decrementReferenceCount()
|
||||
})
|
||||
uow.enqueue(key, seq, message, delay)
|
||||
}
|
||||
|
||||
|
||||
override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)
|
||||
override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = {
|
||||
message.getMessageId.setEntryLocator(null)
|
||||
|
@ -718,7 +721,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
def getTopicGCPositions = {
|
||||
import collection.JavaConversions._
|
||||
val topics = this.synchronized {
|
||||
|
|
Loading…
Reference in New Issue