Adding assertions to make sure that we only append to the log from the write thread. Found a code path that was appending to the log from a different thread. This might have been affecting https://issues.apache.org/jira/browse/AMQ-4882

This commit is contained in:
Hiram Chirino 2013-12-03 12:04:40 -05:00
parent f0334862a2
commit 5fa462a08a
2 changed files with 14 additions and 6 deletions

View File

@ -798,7 +798,7 @@ class DBManager(val parent:LevelDBStore) {
def createTransactionContainer(id:XATransactionId) =
createCollection(buffer(parent.wireFormat.marshal(id)), TRANSACTION_COLLECTION_TYPE)
def removeTransactionContainer(key:Long) = { // writeExecutor.sync {
def removeTransactionContainer(key:Long) = writeExecutor.sync {
client.removeCollection(key)
}

View File

@ -59,6 +59,10 @@ import org.apache.activemq.broker.SuppressReplyException
*/
object LevelDBClient extends Log {
class WriteThread(r:Runnable) extends Thread(r) {
setDaemon(true)
}
final val STORE_SCHEMA_PREFIX = "activemq_leveldb_store:"
final val STORE_SCHEMA_VERSION = 1
@ -512,6 +516,7 @@ class LevelDBClient(store: LevelDBStore) {
}
def storeTrace(ascii:String, force:Boolean=false) = {
assert_write_thread_executing
val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date)
log.appender { appender =>
appender.append(LOG_TRACE, new AsciiBuffer("%s: %s".format(time, ascii)))
@ -566,6 +571,8 @@ class LevelDBClient(store: LevelDBStore) {
replay_write_batch = null;
}
def assert_write_thread_executing = assert(Thread.currentThread().getClass == classOf[WriteThread])
def init() ={
// Lets check store compatibility...
@ -590,11 +597,7 @@ class LevelDBClient(store: LevelDBStore) {
version_file.writeText(STORE_SCHEMA_PREFIX + STORE_SCHEMA_VERSION)
writeExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
def newThread(r: Runnable) = {
val rc = new Thread(r, "LevelDB store io write")
rc.setDaemon(true)
rc
}
def newThread(r: Runnable) = new WriteThread(r)
})
val factoryNames = store.indexFactory
@ -1125,6 +1128,8 @@ class LevelDBClient(store: LevelDBStore) {
}
def addCollection(record: CollectionRecord.Buffer) = {
assert_write_thread_executing
val key = encodeLongKey(COLLECTION_PREFIX, record.getKey)
val value = record.toUnframedBuffer
might_fail_using_index {
@ -1153,6 +1158,7 @@ class LevelDBClient(store: LevelDBStore) {
}
def removeCollection(collectionKey: Long) = {
assert_write_thread_executing
val key = encodeLongKey(COLLECTION_PREFIX, collectionKey)
val value = encodeVLong(collectionKey)
val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
@ -1181,6 +1187,7 @@ class LevelDBClient(store: LevelDBStore) {
}
def collectionEmpty(collectionKey: Long) = {
assert_write_thread_executing
val key = encodeLongKey(COLLECTION_PREFIX, collectionKey)
val value = encodeVLong(collectionKey)
val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
@ -1366,6 +1373,7 @@ class LevelDBClient(store: LevelDBStore) {
val max_index_write_latency = TimeMetric()
def store(uows: Array[DelayableUOW]) {
assert_write_thread_executing
might_fail_using_index {
log.appender { appender =>
val syncNeeded = index.write(new WriteOptions, max_index_write_latency) { batch =>