mirror of https://github.com/apache/activemq.git
Seems like on some machines leveldb index updates are delayed.. looping seems to fix it.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1485897 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9200635eeb
commit
75245da626
|
@ -108,4 +108,7 @@ public interface LevelDBStoreViewMBean {
|
|||
@MBeanInfo("Compacts disk usage")
|
||||
void compact();
|
||||
|
||||
@MBeanInfo("Are delayed index updates occurring?")
|
||||
boolean getDelayedIndexUpdates();
|
||||
|
||||
}
|
||||
|
|
|
@ -491,6 +491,10 @@ class LevelDBClient(store: LevelDBStore) {
|
|||
|
||||
var writeExecutor:ExecutorService = _
|
||||
|
||||
def writeExecutorExec(func: =>Unit ) = writeExecutor {
|
||||
func
|
||||
}
|
||||
|
||||
def storeTrace(ascii:String, force:Boolean=false) = {
|
||||
val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date)
|
||||
log.appender { appender =>
|
||||
|
|
|
@ -98,6 +98,7 @@ class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean {
|
|||
def getParanoidChecks = paranoidChecks
|
||||
def getSync = sync
|
||||
def getVerifyChecksums = verifyChecksums
|
||||
def getDelayedIndexUpdates = delayedIndexUpdates
|
||||
|
||||
def getUowClosedCounter = db.uowClosedCounter
|
||||
def getUowCanceledCounter = db.uowCanceledCounter
|
||||
|
@ -183,6 +184,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
|||
val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
|
||||
val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
|
||||
val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
|
||||
var delayedIndexUpdates = false
|
||||
|
||||
def init() = {}
|
||||
|
||||
|
@ -706,8 +708,35 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
|||
}
|
||||
|
||||
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
|
||||
val excluding = PreparedExcluding(LimitingRecoveryListener(maxReturned, listener))
|
||||
cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
|
||||
var found = false
|
||||
var counter = 0;
|
||||
while( !found ) {
|
||||
val limiting = LimitingRecoveryListener(maxReturned, listener)
|
||||
val excluding = PreparedExcluding(limiting)
|
||||
cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
|
||||
if( limiting.recovered > 0 ) {
|
||||
if( !delayedIndexUpdates && counter>0 ) {
|
||||
info("This machine seems to have delayed index updates.")
|
||||
delayedIndexUpdates = true
|
||||
}
|
||||
found = true
|
||||
} else {
|
||||
// Seems like on some systems it takes a while for leveldb index updates
|
||||
// to become visible for read. Need to figure out why this is, but until
|
||||
// then, lets loop until we can read it.
|
||||
if( counter > 10 ) {
|
||||
found = true
|
||||
} else {
|
||||
counter+=1
|
||||
// lets try to sync up /w the write thread..
|
||||
val t = new CountDownLatch(1)
|
||||
client.writeExecutorExec {
|
||||
t.countDown()
|
||||
}
|
||||
t.await()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def setBatch(id: MessageId): Unit = {
|
||||
|
|
Loading…
Reference in New Issue