Fixes issue identified in APLO-245 where index does not seem to get cleaned up / compacted. Fix ported over from Apollo.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1480711 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-09 16:11:52 +00:00
parent d98ac90f28
commit 25356f2695
3 changed files with 106 additions and 11 deletions

View File

@ -104,4 +104,8 @@ public interface LevelDBStoreViewMBean {
@MBeanInfo("Gets the index statistics.")
String getIndexStats();
@MBeanInfo("Compacts disk usage")
void compact();
}

View File

@ -38,7 +38,7 @@ import java.text.SimpleDateFormat
import java.util.{Date, Collections}
import org.apache.activemq.leveldb.util.TimeMetric
import org.apache.activemq.leveldb.RecordLog.LogInfo
import scala.Some
import org.fusesource.leveldbjni.internal.JniDB
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@ -252,7 +252,7 @@ object LevelDBClient extends Log {
def cursorKeysPrefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
val iterator = db.iterator(ro)
iterator.seek(prefix);
might_trigger_compaction(iterator.seek(prefix));
try {
def check(key:Buffer) = {
key.startsWith(prefix) && func(key)
@ -267,7 +267,7 @@ object LevelDBClient extends Log {
def cursorPrefixed(prefix:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
val iterator = db.iterator(ro)
iterator.seek(prefix);
might_trigger_compaction(iterator.seek(prefix));
try {
def check(key:Buffer) = {
key.startsWith(prefix) && func(key, iterator.peekNext.getValue)
@ -286,7 +286,7 @@ object LevelDBClient extends Log {
def cursorRangeKeys(startIncluded:Array[Byte], endExcluded:Array[Byte], ro:ReadOptions=new ReadOptions)(func: Array[Byte] => Boolean): Unit = {
val iterator = db.iterator(ro)
iterator.seek(startIncluded);
might_trigger_compaction(iterator.seek(startIncluded));
try {
def check(key:Array[Byte]) = {
if ( compare(key,endExcluded) < 0) {
@ -305,7 +305,7 @@ object LevelDBClient extends Log {
def cursorRange(startIncluded:Array[Byte], endExcluded:Array[Byte], ro:ReadOptions=new ReadOptions)(func: (Array[Byte],Array[Byte]) => Boolean): Unit = {
val iterator = db.iterator(ro)
iterator.seek(startIncluded);
might_trigger_compaction(iterator.seek(startIncluded));
try {
def check(key:Array[Byte]) = {
(compare(key,endExcluded) < 0) && func(key, iterator.peekNext.getValue)
@ -337,7 +337,7 @@ object LevelDBClient extends Log {
val iterator = db.iterator(ro)
try {
iterator.seek(last);
might_trigger_compaction(iterator.seek(last));
if ( iterator.hasPrev ) {
iterator.prev()
} else {
@ -359,6 +359,35 @@ object LevelDBClient extends Log {
}
}
}
def compact = {
compact_needed = false
db match {
case db:JniDB =>
db.compactRange(null, null)
// case db:DbImpl =>
// val start = new Slice(Array[Byte]('a'.toByte))
// val end = new Slice(Array[Byte]('z'.toByte))
// db.compactRange(2, start, end)
case _ =>
}
}
private def might_trigger_compaction[T](func: => T): T = {
val start = System.nanoTime()
try {
func
} finally {
val duration = System.nanoTime() - start
// If it takes longer than 100 ms..
if( duration > 1000000*100 ) {
compact_needed = true
}
}
}
@volatile
var compact_needed = false
}
@ -1365,8 +1394,56 @@ class LevelDBClient(store: LevelDBStore) {
collectionMeta.get(collectionKey).flatMap(x=> Option(x.last_key)).map(new Buffer(_))
}
// APLO-245: lets try to detect when leveldb needs a compaction..
private def detect_if_compact_needed:Unit = {
// auto compaction might be disabled...
if ( store.autoCompactionRatio <= 0 ) {
return
}
// How much space is the dirty index using??
var index_usage = 0L
for( file <- dirtyIndexFile.recursiveList ) {
if(!file.isDirectory) {
index_usage += file.length()
}
}
// Lets use the log_refs to get a rough estimate on how many entries are store in leveldb.
var index_queue_entries=0L
for ( (_, count) <- logRefs ) {
index_queue_entries += count.get()
}
if ( index_queue_entries > 0 ) {
val ratio = (index_usage*1.0f/index_queue_entries)
// println("usage: index_usage:%d, index_queue_entries:%d, ratio: %f".format(index_usage, index_queue_entries, ratio))
// After running some load we empirically found that a healthy ratio is between 12 and 25 bytes per entry.
// lets compact if we go way over the healthy ratio.
if( ratio > store.autoCompactionRatio ) {
index.compact_needed = true
}
} else if( index_usage > 1024*1024*5 ) {
// at most the index should have 1 full level file.
index.compact_needed = true
}
}
def gc(topicPositions:Seq[(Long, Long)]):Unit = {
detect_if_compact_needed
// Lets compact the leveldb index if it looks like we need to.
if( index.compact_needed ) {
debug("Compacting the leveldb index at: %s", dirtyIndexFile)
val start = System.nanoTime()
index.compact
val duration = System.nanoTime() - start;
info("Compacted the leveldb index at: %s in %.2f ms", dirtyIndexFile, (duration / 1000000.0))
}
// Delete message refs for topics who's consumers have advanced..
if( !topicPositions.isEmpty ) {
retryUsingIndex {

View File

@ -33,8 +33,8 @@ import org.apache.activemq.broker.jmx.{BrokerMBeanSupport, AnnotatedMBean}
import org.apache.activemq.util._
import org.apache.activemq.leveldb.util.{RetrySupport, Log}
import org.apache.activemq.store.PList.PListIterator
import java.lang
import org.fusesource.hawtbuf.{UTF8Buffer, DataByteArrayOutputStream}
import org.fusesource.hawtdispatch;
object LevelDBStore extends Log {
val DEFAULT_DIRECTORY = new File("LevelDB");
@ -63,7 +63,7 @@ object LevelDBStore extends Log {
return IOExceptionSupport.create(e)
}
def waitOn(future: Future[AnyRef]): Unit = {
def waitOn(future: java.util.concurrent.Future[AnyRef]): Unit = {
try {
future.get
}
@ -116,6 +116,18 @@ class LevelDBStoreView(val store:LevelDBStore) extends LevelDBStoreViewMBean {
def resetMaxLogRotateLatency = db.client.log.max_log_rotate_latency.reset
def getIndexStats = db.client.index.getProperty("leveldb.stats")
def compact() {
import hawtdispatch._
var done = new CountDownLatch(1)
val positions = getTopicGCPositions
client.writeExecutor {
client.index.compact_needed = true
client.gc(positions)
done.countDown()
}
done.await()
}
}
import LevelDBStore._
@ -161,6 +173,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
var asyncBufferSize = 1024*1024*4
@BeanProperty
var monitorStats = false
@BeanProperty
var autoCompactionRatio = 100
var purgeOnStatup: Boolean = false
@ -822,14 +836,14 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
var pos = lastSeq.decrementAndGet()
add(pos, id, bs)
listSize.incrementAndGet()
new lang.Long(pos)
new java.lang.Long(pos)
}
def addLast(id: String, bs: ByteSequence): AnyRef = {
var pos = lastSeq.incrementAndGet()
add(pos, id, bs)
listSize.incrementAndGet()
new lang.Long(pos)
new java.lang.Long(pos)
}
def add(pos:Long, id: String, bs: ByteSequence) = {
@ -843,7 +857,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
}
def remove(position: AnyRef): Boolean = {
val pos = position.asInstanceOf[lang.Long].longValue()
val pos = position.asInstanceOf[java.lang.Long].longValue()
val encoded_key = encodeLongLong(key, pos)
db.plistGet(encoded_key) match {
case Some(value) =>