diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index e7aeb7421e..7f6e193439 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -624,10 +624,13 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = { val seq = lastSeq.incrementAndGet() + message.incrementReferenceCount() + uow.addCompleteListener({ + message.decrementReferenceCount() + }) uow.enqueue(key, seq, message, delay) } - override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false) override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = { message.getMessageId.setEntryLocator(null) @@ -718,7 +721,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } } - def getTopicGCPositions = { import collection.JavaConversions._ val topics = this.synchronized {