From b41121e359c44781554bd45fe9ad912cf3c11bb4 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Fri, 24 May 2013 22:33:32 +0000 Subject: [PATCH] This should fixed the problem with the delayed leveldb index updates. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1486242 13f79535-47bb-0310-9956-ffa450edef68 --- .../leveldb/LevelDBStoreViewMBean.java | 3 -- .../apache/activemq/leveldb/DBManager.scala | 3 +- .../activemq/leveldb/LevelDBStore.scala | 28 ------------------- 3 files changed, 1 insertion(+), 33 deletions(-) diff --git a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java index 015c5636a5..68507950ae 100644 --- a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java +++ b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/LevelDBStoreViewMBean.java @@ -108,7 +108,4 @@ public interface LevelDBStoreViewMBean { @MBeanInfo("Compacts disk usage") void compact(); - @MBeanInfo("Are delayed index updates occurring?") - boolean getDelayedIndexUpdates(); - } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 500a739ff2..0cf0157be7 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -331,7 +331,6 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { val s = size if( manager.asyncCapacityRemaining.addAndGet(-s) > 0 ) { asyncCapacityUsed = s - countDownFuture.set(null) manager.parent.blocking_executor.execute(^{ complete_listeners.foreach(_()) }) @@ -353,11 +352,11 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { asyncCapacityUsed = 0 } else { manager.uow_complete_latency.add(System.nanoTime() - disposed_at) - countDownFuture.set(null) manager.parent.blocking_executor.execute(^{ complete_listeners.foreach(_()) }) } + countDownFuture.set(null) for( (id, action) <- actions ) { if( !action.enqueues.isEmpty ) { diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index d746ffd3ba..d63a63d372 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -98,7 +98,6 @@ class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean { def getParanoidChecks = paranoidChecks def getSync = sync def getVerifyChecksums = verifyChecksums - def getDelayedIndexUpdates = delayedIndexUpdates def getUowClosedCounter = db.uowClosedCounter def getUowCanceledCounter = db.uowCanceledCounter @@ -184,7 +183,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P val topics = collection.mutable.HashMap[ActiveMQTopic, LevelDBStore#LevelDBTopicMessageStore]() val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]() val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]() - var delayedIndexUpdates = false def init() = {} @@ -708,35 +706,9 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = { - var found = false - var counter = 0; - while( !found ) { val limiting = LimitingRecoveryListener(maxReturned, listener) val excluding = PreparedExcluding(limiting) cursorPosition = db.cursorMessages(key, excluding, cursorPosition) - if( limiting.recovered > 0 ) { - if( !delayedIndexUpdates && counter>0 ) { - info("This machine seems to have delayed index updates.") - delayedIndexUpdates = true - } - found = true - } else { - // Seems like on some systems it takes a while for leveldb index updates - // to become visible for read. Need to figure out why this is, but until - // then, lets loop until we can read it. - if( counter > 10 ) { - found = true - } else { - counter+=1 - // lets try to sync up /w the write thread.. - val t = new CountDownLatch(1) - client.writeExecutorExec { - t.countDown() - } - t.await() - } - } - } } override def setBatch(id: MessageId): Unit = {