diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index e2bc595f74..6687aa33f6 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -835,22 +835,28 @@ class LevelDBClient(store: LevelDBStore) { writeExecutor = null // this blocks until all io completes.. - // Suspend also deletes the index. - if( index!=null ) { - suspend() - index = null + snapshotRwLock.writeLock().lock() + try { + // Suspend also deletes the index. + if( index!=null ) { + storeCounters + index.put(DIRTY_INDEX_KEY, FALSE, new WriteOptions().sync(true)) + index.close + index = null + } + if (log.isOpen) { + log.close + copyDirtyIndexToSnapshot + wal_append_position = log.appender_limit + log = null + } + if( plist!=null ) { + plist.close + plist=null + } + } finally { + snapshotRwLock.writeLock().unlock() } - - if (log.isOpen) { - log.close - copyDirtyIndexToSnapshot - wal_append_position = log.appender_limit - } - if( plist!=null ) { - plist.close - plist=null - } - log = null } } @@ -875,10 +881,10 @@ class LevelDBClient(store: LevelDBStore) { // we will be closing it to create a consistent snapshot. snapshotRwLock.writeLock().lock() - // Close the index so that it's files are not changed async on us. storeCounters index.put(DIRTY_INDEX_KEY, FALSE, new WriteOptions().sync(true)) - index.close + // Suspend the index so that it's files are not changed async on us. + index.db.suspendCompactions() } /** @@ -887,10 +893,7 @@ class LevelDBClient(store: LevelDBStore) { */ def resume() = { // re=open it.. - retry { - index = new RichDB(factory.open(dirtyIndexFile, indexOptions)); - index.put(DIRTY_INDEX_KEY, TRUE) - } + index.db.resumeCompactions() snapshotRwLock.writeLock().unlock() }