diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index b0c9a823f2..796b3609c4 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -310,7 +310,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) { asyncCapacityUsed = s countDownFuture.countDown - manager.parent.brokerService.getTaskRunnerFactory.execute(^{ + manager.parent.broker_service.getTaskRunnerFactory.execute(^{ complete_listeners.foreach(_()) }) } else { @@ -332,7 +332,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { } else { manager.uow_complete_latency.add(System.nanoTime() - disposed_at) countDownFuture.countDown - manager.parent.brokerService.getTaskRunnerFactory.execute(^{ + manager.parent.broker_service.getTaskRunnerFactory.execute(^{ complete_listeners.foreach(_()) }) } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 6c852ff3b5..93e736a5bc 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -17,9 +17,7 @@ package org.apache.activemq.leveldb -import org.apache.activemq.broker.BrokerService -import org.apache.activemq.broker.BrokerServiceAware -import org.apache.activemq.broker.ConnectionContext +import org.apache.activemq.broker.{LockableServiceSupport, BrokerService, BrokerServiceAware, ConnectionContext} import org.apache.activemq.command._ import org.apache.activemq.openwire.OpenWireFormat import org.apache.activemq.usage.SystemUsage @@ -113,7 +111,7 @@ class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean { import LevelDBStore._ -class LevelDBStore extends ServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore { +class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore { final val wireFormat = new OpenWireFormat final val db = new DBManager(this) @@ -153,16 +151,21 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten var asyncBufferSize = 1024*1024*4 @BeanProperty var monitorStats = false - @BeanProperty - var failIfLocked = false var purgeOnStatup: Boolean = false - var brokerService: BrokerService = null val queues = collection.mutable.HashMap[ActiveMQQueue, LevelDBStore#LevelDBMessageStore]() val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]() val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]() + def init() = {} + + def createDefaultLocker() = { + var locker = new SharedFileLocker(); + locker.configure(this); + locker + } + override def toString: String = { return "LevelDB:[" + directory.getAbsolutePath + "]" } @@ -177,8 +180,6 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten def retry[T](func : =>T):T = RetrySupport.retry(LevelDBStore, isStarted, func _) - var lock_file: LockFile = _ - var snappyCompressLogs = false def doStart: Unit = { @@ -186,9 +187,6 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null debug("starting") - if ( lock_file==null ) { - lock_file = new LockFile(directory / "lock", true) - } // Expose a JMX bean to expose the status of the store. if(brokerService!=null){ @@ -201,14 +199,6 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten } } - if (failIfLocked) { - lock_file.lock() - } else { - retry { - lock_file.lock() - } - } - if (purgeOnStatup) { purgeOnStatup = false db.client.locked_purge @@ -247,16 +237,13 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten def doStop(stopper: ServiceStopper): Unit = { db.stop - lock_file.unlock() if(brokerService!=null){ brokerService.getManagementContext().unregisterMBean(objectName); } info("Stopped "+this) } - def setBrokerService(brokerService: BrokerService): Unit = { - this.brokerService = brokerService - } + def broker_service = brokerService def setBrokerName(brokerName: String): Unit = { } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala index 5d492def61..ad75cfe358 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/util/RetrySupport.scala @@ -38,7 +38,6 @@ object RetrySupport { rc = Some(func()) } catch { case e:Throwable => - e.printStackTrace() if( error==null ) { warn(e, "DB operation failed. (entering recovery mode)") }