diff --git a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java index 68507950ae..015c5636a5 100644 --- a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java +++ b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java @@ -108,4 +108,7 @@ public interface LevelDBStoreViewMBean { @MBeanInfo("Compacts disk usage") void compact(); + @MBeanInfo("Are delayed index updates occurring?") + boolean getDelayedIndexUpdates(); + } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index c6ea02db97..027478f55b 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -491,6 +491,10 @@ class LevelDBClient(store: LevelDBStore) { var writeExecutor:ExecutorService = _ + def writeExecutorExec(func: =>Unit ) = writeExecutor { + func + } + def storeTrace(ascii:String, force:Boolean=false) = { val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date) log.appender { appender => diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 2015c9948a..d746ffd3ba 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -98,6 +98,7 @@ class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean { def getParanoidChecks = paranoidChecks def getSync = sync def getVerifyChecksums = verifyChecksums + def getDelayedIndexUpdates = delayedIndexUpdates def getUowClosedCounter = db.uowClosedCounter def getUowCanceledCounter = db.uowCanceledCounter @@ -183,6 +184,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]() val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]() val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]() + var delayedIndexUpdates = false def init() = {} @@ -706,8 +708,35 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = { - val excluding = PreparedExcluding(LimitingRecoveryListener(maxReturned, listener)) - cursorPosition = db.cursorMessages(key, excluding, cursorPosition) + var found = false + var counter = 0; + while( !found ) { + val limiting = LimitingRecoveryListener(maxReturned, listener) + val excluding = PreparedExcluding(limiting) + cursorPosition = db.cursorMessages(key, excluding, cursorPosition) + if( limiting.recovered > 0 ) { + if( !delayedIndexUpdates && counter>0 ) { + info("This machine seems to have delayed index updates.") + delayedIndexUpdates = true + } + found = true + } else { + // Seems like on some systems it takes a while for leveldb index updates + // to become visible for read. Need to figure out why this is, but until + // then, lets loop until we can read it. + if( counter > 10 ) { + found = true + } else { + counter+=1 + // lets try to sync up /w the write thread.. + val t = new CountDownLatch(1) + client.writeExecutorExec { + t.countDown() + } + t.await() + } + } + } } override def setBatch(id: MessageId): Unit = {