mirror of https://github.com/apache/activemq.git
This should fixed the problem with the delayed leveldb index updates.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1486242 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3ff2d9294c
commit
b41121e359
|
@ -108,7 +108,4 @@ public interface LevelDBStoreViewMBean {
|
||||||
@MBeanInfo("Compacts disk usage")
|
@MBeanInfo("Compacts disk usage")
|
||||||
void compact();
|
void compact();
|
||||||
|
|
||||||
@MBeanInfo("Are delayed index updates occurring?")
|
|
||||||
boolean getDelayedIndexUpdates();
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -331,7 +331,6 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
|
||||||
val s = size
|
val s = size
|
||||||
if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
|
if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) {
|
||||||
asyncCapacityUsed = s
|
asyncCapacityUsed = s
|
||||||
countDownFuture.set(null)
|
|
||||||
manager.parent.blocking_executor.execute(^{
|
manager.parent.blocking_executor.execute(^{
|
||||||
complete_listeners.foreach(_())
|
complete_listeners.foreach(_())
|
||||||
})
|
})
|
||||||
|
@ -353,11 +352,11 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
|
||||||
asyncCapacityUsed = 0
|
asyncCapacityUsed = 0
|
||||||
} else {
|
} else {
|
||||||
manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
|
manager.uow_complete_latency.add(System.nanoTime() - disposed_at)
|
||||||
countDownFuture.set(null)
|
|
||||||
manager.parent.blocking_executor.execute(^{
|
manager.parent.blocking_executor.execute(^{
|
||||||
complete_listeners.foreach(_())
|
complete_listeners.foreach(_())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
countDownFuture.set(null)
|
||||||
|
|
||||||
for( (id, action) <- actions ) {
|
for( (id, action) <- actions ) {
|
||||||
if( !action.enqueues.isEmpty ) {
|
if( !action.enqueues.isEmpty ) {
|
||||||
|
|
|
@ -98,7 +98,6 @@ class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean {
|
||||||
def getParanoidChecks = paranoidChecks
|
def getParanoidChecks = paranoidChecks
|
||||||
def getSync = sync
|
def getSync = sync
|
||||||
def getVerifyChecksums = verifyChecksums
|
def getVerifyChecksums = verifyChecksums
|
||||||
def getDelayedIndexUpdates = delayedIndexUpdates
|
|
||||||
|
|
||||||
def getUowClosedCounter = db.uowClosedCounter
|
def getUowClosedCounter = db.uowClosedCounter
|
||||||
def getUowCanceledCounter = db.uowCanceledCounter
|
def getUowCanceledCounter = db.uowCanceledCounter
|
||||||
|
@ -184,7 +183,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
||||||
val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
|
val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]()
|
||||||
val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
|
val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
|
||||||
val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
|
val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
|
||||||
var delayedIndexUpdates = false
|
|
||||||
|
|
||||||
def init() = {}
|
def init() = {}
|
||||||
|
|
||||||
|
@ -708,35 +706,9 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
||||||
}
|
}
|
||||||
|
|
||||||
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
|
def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = {
|
||||||
var found = false
|
|
||||||
var counter = 0;
|
|
||||||
while( !found ) {
|
|
||||||
val limiting = LimitingRecoveryListener(maxReturned, listener)
|
val limiting = LimitingRecoveryListener(maxReturned, listener)
|
||||||
val excluding = PreparedExcluding(limiting)
|
val excluding = PreparedExcluding(limiting)
|
||||||
cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
|
cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
|
||||||
if( limiting.recovered > 0 ) {
|
|
||||||
if( !delayedIndexUpdates && counter>0 ) {
|
|
||||||
info("This machine seems to have delayed index updates.")
|
|
||||||
delayedIndexUpdates = true
|
|
||||||
}
|
|
||||||
found = true
|
|
||||||
} else {
|
|
||||||
// Seems like on some systems it takes a while for leveldb index updates
|
|
||||||
// to become visible for read. Need to figure out why this is, but until
|
|
||||||
// then, lets loop until we can read it.
|
|
||||||
if( counter > 10 ) {
|
|
||||||
found = true
|
|
||||||
} else {
|
|
||||||
counter+=1
|
|
||||||
// lets try to sync up /w the write thread..
|
|
||||||
val t = new CountDownLatch(1)
|
|
||||||
client.writeExecutorExec {
|
|
||||||
t.countDown()
|
|
||||||
}
|
|
||||||
t.await()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def setBatch(id: MessageId): Unit = {
|
override def setBatch(id: MessageId): Unit = {
|
||||||
|
|
Loading…
Reference in New Issue