diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 45320c3502..97d455dc3b 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -792,6 +792,7 @@ class DBManager(val parent:LevelDBStore) { var sub = DurableSubscription(key, sr.getTopicKey, info) sub.lastAckPosition = client.getAckPosition(key); + sub.gcPosition = sub.lastAckPosition parent.createSubscription(sub) case TRANSACTION_COLLECTION_TYPE => val meta = record.getMeta diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index 8adeb539c0..c6ea02db97 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -1297,10 +1297,7 @@ class LevelDBClient(store: LevelDBStore) { index_record.setValueLength(dataLocator.len) batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray) - val log_data = encodeEntryRecord(log_record.freeze()) val index_data = encodeEntryRecord(index_record.freeze()).toByteArray - - appender.append(LOG_ADD_ENTRY, log_data) batch.put(key, index_data) for (key <- logRefKey(dataLocator.pos, log_info)) { @@ -1322,7 +1319,7 @@ class LevelDBClient(store: LevelDBStore) { throw new RuntimeException("Unexpected locator type") } } - println(dataLocator) +// println(dataLocator) val el = ack.getLastMessageId.getEntryLocator.asInstanceOf[EntryLocator]; val os = new DataByteArrayOutputStream() diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 62d271a716..9945be6fbb 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -76,6 +76,7 @@ object LevelDBStore extends Log { } case class DurableSubscription(subKey:Long, topicKey:Long, info: SubscriptionInfo) { + var gcPosition = 0L var lastAckPosition = 0L var cursorPosition = 0L } @@ -381,6 +382,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def commit(uow:DelayableUOW) = { store.doUpdateAckPosition(uow, sub, position) + sub.gcPosition = position } def prepare(uow:DelayableUOW) = { prev_position = sub.lastAckPosition @@ -756,8 +758,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P var pos = lastSeq.get() subscriptions.synchronized { subscriptions.values.foreach { sub => - if( sub.lastAckPosition < pos ) { - pos = sub.lastAckPosition + if( sub.gcPosition < pos ) { + pos = sub.gcPosition } } if( firstSeq != pos+1) { @@ -775,6 +777,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P subscriptions.put((info.getClientId, info.getSubcriptionName), sub) } sub.lastAckPosition = if (retroactive) 0 else lastSeq.get() + sub.gcPosition = sub.lastAckPosition waitOn(withUow{ uow=> uow.updateAckPosition(sub.subKey, sub.lastAckPosition) uow.countDownFuture @@ -801,6 +804,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def doUpdateAckPosition(uow: DelayableUOW, sub: DurableSubscription, position: Long) = { sub.lastAckPosition = position + sub.gcPosition = position uow.updateAckPosition(sub.subKey, sub.lastAckPosition) }