Fix for potential deadlock when external classes synchronize on the
LevelDBStore instance which can deadlock the hawtDispatch runner thread
if a task also attempts to take the lock to protect some mutable state
values.
This commit is contained in:
Timothy Bish 2014-07-07 17:53:46 -04:00
parent c6d0aaa81b
commit e62e90abaf
1 changed files with 11 additions and 9 deletions

View File

@ -184,6 +184,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]() val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]() val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
private val lock = new Object();
def check_running = { def check_running = {
if( this.isStopped ) { if( this.isStopped ) {
throw new SuppressReplyException("Store has been stopped") throw new SuppressReplyException("Store has been stopped")
@ -540,12 +542,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def getPList(name: String): PList = { def getPList(name: String): PList = {
this.synchronized(plists.get(name)).getOrElse(db.createPList(name)) lock.synchronized(plists.get(name)).getOrElse(db.createPList(name))
} }
def createPList(name: String, key: Long):LevelDBStore#LevelDBPList = { def createPList(name: String, key: Long):LevelDBStore#LevelDBPList = {
var rc = new LevelDBPList(name, key) var rc = new LevelDBPList(name, key)
this.synchronized { lock.synchronized {
plists.put(name, rc) plists.put(name, rc)
} }
rc rc
@ -573,30 +575,30 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
} }
def createQueueMessageStore(destination: ActiveMQQueue):LevelDBStore#LevelDBMessageStore = { def createQueueMessageStore(destination: ActiveMQQueue):LevelDBStore#LevelDBMessageStore = {
this.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination)) lock.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination))
} }
def createQueueMessageStore(destination: ActiveMQQueue, key: Long):LevelDBStore#LevelDBMessageStore = { def createQueueMessageStore(destination: ActiveMQQueue, key: Long):LevelDBStore#LevelDBMessageStore = {
var rc = new LevelDBMessageStore(destination, key) var rc = new LevelDBMessageStore(destination, key)
this.synchronized { lock.synchronized {
queues.put(destination, rc) queues.put(destination, rc)
} }
rc rc
} }
def removeQueueMessageStore(destination: ActiveMQQueue): Unit = this synchronized { def removeQueueMessageStore(destination: ActiveMQQueue): Unit = lock synchronized {
queues.remove(destination).foreach { store=> queues.remove(destination).foreach { store=>
db.destroyQueueStore(store.key) db.destroyQueueStore(store.key)
} }
} }
def createTopicMessageStore(destination: ActiveMQTopic):LevelDBStore#LevelDBTopicMessageStore = { def createTopicMessageStore(destination: ActiveMQTopic):LevelDBStore#LevelDBTopicMessageStore = {
this.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination)) lock.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination))
} }
def createTopicMessageStore(destination: ActiveMQTopic, key: Long):LevelDBStore#LevelDBTopicMessageStore = { def createTopicMessageStore(destination: ActiveMQTopic, key: Long):LevelDBStore#LevelDBTopicMessageStore = {
var rc = new LevelDBTopicMessageStore(destination, key) var rc = new LevelDBTopicMessageStore(destination, key)
this synchronized { lock synchronized {
topics.put(destination, rc) topics.put(destination, rc)
topicsById.put(key, rc) topicsById.put(key, rc)
} }
@ -777,7 +779,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
// This gts called when the store is first loading up, it restores // This gts called when the store is first loading up, it restores
// the existing durable subs.. // the existing durable subs..
def createSubscription(sub:DurableSubscription) = { def createSubscription(sub:DurableSubscription) = {
this.synchronized(topicsById.get(sub.topicKey)) match { lock.synchronized(topicsById.get(sub.topicKey)) match {
case Some(topic) => case Some(topic) =>
topic.synchronized { topic.synchronized {
topic.subscriptions.put((sub.info.getClientId, sub.info.getSubcriptionName), sub) topic.subscriptions.put((sub.info.getClientId, sub.info.getSubcriptionName), sub)
@ -790,7 +792,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def getTopicGCPositions = { def getTopicGCPositions = {
import collection.JavaConversions._ import collection.JavaConversions._
val topics = this.synchronized { val topics = lock.synchronized {
new ArrayList(topicsById.values()) new ArrayList(topicsById.values())
} }
topics.flatMap(_.gcPosition).toSeq topics.flatMap(_.gcPosition).toSeq