AMQ-4005 : Also support pluggable storage lockers for the LevelDB store.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1411901 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-11-20 22:02:00 +00:00
parent 72dfbfa283
commit 2751497e38
3 changed files with 13 additions and 27 deletions

View File

@ -310,7 +310,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) { if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
asyncCapacityUsed = s asyncCapacityUsed = s
countDownFuture.countDown countDownFuture.countDown
manager.parent.brokerService.getTaskRunnerFactory.execute(^{ manager.parent.broker_service.getTaskRunnerFactory.execute(^{
complete_listeners.foreach(_()) complete_listeners.foreach(_())
}) })
} else { } else {
@ -332,7 +332,7 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
} else { } else {
manager.uow_complete_latency.add(System.nanoTime() - disposed_at) manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
countDownFuture.countDown countDownFuture.countDown
manager.parent.brokerService.getTaskRunnerFactory.execute(^{ manager.parent.broker_service.getTaskRunnerFactory.execute(^{
complete_listeners.foreach(_()) complete_listeners.foreach(_())
}) })
} }

View File

@ -17,9 +17,7 @@
package org.apache.activemq.leveldb package org.apache.activemq.leveldb
import org.apache.activemq.broker.BrokerService import org.apache.activemq.broker.{LockableServiceSupport, BrokerService, BrokerServiceAware, ConnectionContext}
import org.apache.activemq.broker.BrokerServiceAware
import org.apache.activemq.broker.ConnectionContext
import org.apache.activemq.command._ import org.apache.activemq.command._
import org.apache.activemq.openwire.OpenWireFormat import org.apache.activemq.openwire.OpenWireFormat
import org.apache.activemq.usage.SystemUsage import org.apache.activemq.usage.SystemUsage
@ -113,7 +111,7 @@ class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean {
import LevelDBStore._ import LevelDBStore._
class LevelDBStore extends ServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore { class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with PersistenceAdapter with TransactionStore {
final val wireFormat = new OpenWireFormat final val wireFormat = new OpenWireFormat
final val db = new DBManager(this) final val db = new DBManager(this)
@ -153,16 +151,21 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
var asyncBufferSize = 1024*1024*4 var asyncBufferSize = 1024*1024*4
@BeanProperty @BeanProperty
var monitorStats = false var monitorStats = false
@BeanProperty
var failIfLocked = false
var purgeOnStatup: Boolean = false var purgeOnStatup: Boolean = false
var brokerService: BrokerService = null
val queues = collection.mutable.HashMap[ActiveMQQueue, LevelDBStore#LevelDBMessageStore]() val queues = collection.mutable.HashMap[ActiveMQQueue, LevelDBStore#LevelDBMessageStore]()
val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]() val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]() val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
def init() = {}
def createDefaultLocker() = {
var locker = new SharedFileLocker();
locker.configure(this);
locker
}
override def toString: String = { override def toString: String = {
return "LevelDB:[" + directory.getAbsolutePath + "]" return "LevelDB:[" + directory.getAbsolutePath + "]"
} }
@ -177,8 +180,6 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
def retry[T](func : =>T):T = RetrySupport.retry(LevelDBStore, isStarted, func _) def retry[T](func : =>T):T = RetrySupport.retry(LevelDBStore, isStarted, func _)
var lock_file: LockFile = _
var snappyCompressLogs = false var snappyCompressLogs = false
def doStart: Unit = { def doStart: Unit = {
@ -186,9 +187,6 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null snappyCompressLogs = logCompression.toLowerCase == "snappy" && Snappy != null
debug("starting") debug("starting")
if ( lock_file==null ) {
lock_file = new LockFile(directory / "lock", true)
}
// Expose a JMX bean to expose the status of the store. // Expose a JMX bean to expose the status of the store.
if(brokerService!=null){ if(brokerService!=null){
@ -201,14 +199,6 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
} }
} }
if (failIfLocked) {
lock_file.lock()
} else {
retry {
lock_file.lock()
}
}
if (purgeOnStatup) { if (purgeOnStatup) {
purgeOnStatup = false purgeOnStatup = false
db.client.locked_purge db.client.locked_purge
@ -247,16 +237,13 @@ class LevelDBStore extends ServiceSupport with BrokerServiceAware with Persisten
def doStop(stopper: ServiceStopper): Unit = { def doStop(stopper: ServiceStopper): Unit = {
db.stop db.stop
lock_file.unlock()
if(brokerService!=null){ if(brokerService!=null){
brokerService.getManagementContext().unregisterMBean(objectName); brokerService.getManagementContext().unregisterMBean(objectName);
} }
info("Stopped "+this) info("Stopped "+this)
} }
def setBrokerService(brokerService: BrokerService): Unit = { def broker_service = brokerService
this.brokerService = brokerService
}
def setBrokerName(brokerName: String): Unit = { def setBrokerName(brokerName: String): Unit = {
} }

View File

@ -38,7 +38,6 @@ object RetrySupport {
rc = Some(func()) rc = Some(func())
} catch { } catch {
case e:Throwable => case e:Throwable =>
e.printStackTrace()
if( error==null ) { if( error==null ) {
warn(e, "DB operation failed. (entering recovery mode)") warn(e, "DB operation failed. (entering recovery mode)")
} }