Fixes bug identified in APLO-284. Ported fix over from Apollo.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1480709 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-09 16:11:26 +00:00
parent 4a16c1ff27
commit 3ef7911a1d
3 changed files with 94 additions and 27 deletions

View File

@ -64,6 +64,8 @@ object LevelDBClient extends Log {
final val DIRTY_INDEX_KEY = bytes(":dirty")
final val LOG_REF_INDEX_KEY = bytes(":log-refs")
final val LOGS_INDEX_KEY = bytes(":logs")
final val COLLECTION_META_KEY = bytes(":collection-meta")
final val TRUE = bytes("true")
final val FALSE = bytes("false")
@ -386,15 +388,17 @@ object LevelDBClient extends Log {
var last_key:Array[Byte] = _
}
def copy_index(from:File, to:File) = {
def copyIndex(from:File, to:File) = {
for( file <- from.list_files ) {
val name = file.getName
if( name == "CURRENT" || name.startsWith("MANIFEST-") ) {
/// These might not be append only files, so avoid hard linking just to be safe.
file.copyTo(to / file.getName)
val name: String = file.getName
if( name.endsWith(".sst") ) {
// SST files don't change once created, safe to hard link.
file.linkTo(to / name)
} else if(name == "LOCK") {
// No need to copy the lock file.
} else {
// These are append only files, so they are safe to hard link.
file.linkTo(to / file.getName)
/// These might not be append only files, so avoid hard linking just to be safe.
file.copyTo(to / name)
}
}
}
@ -441,7 +445,8 @@ class LevelDBClient(store: LevelDBStore) {
var factory:DBFactory = _
val logRefs = HashMap[Long, LongCounter]()
var recoveryLogs:java.util.TreeMap[Long, Void] = _
val collectionMeta = HashMap[Long, CollectionMeta]()
def plistIndexFile = directory / ("plist"+INDEX_SUFFIX)
@ -574,12 +579,9 @@ class LevelDBClient(store: LevelDBStore) {
dirtyIndexFile.recursiveDelete
dirtyIndexFile.mkdirs()
lastSnapshotIndex.foreach { case (id, file) =>
// Resume log replay from a snapshot of the index..
for( (id, file)<- lastSnapshotIndex ) {
try {
for( file <- file.list_files) {
file.linkTo(dirtyIndexFile / file.getName)
}
copyIndex(file, dirtyIndexFile)
} catch {
case e:Exception =>
warn(e, "Could not recover snapshot of the index: "+e)
@ -587,8 +589,15 @@ class LevelDBClient(store: LevelDBStore) {
}
}
index = new RichDB(factory.open(dirtyIndexFile, indexOptions));
loadCounters
if ( store.paranoidChecks ) {
for(value <- index.get(DIRTY_INDEX_KEY) ) {
if( java.util.Arrays.equals(value, TRUE) ) {
warn("Recovering from a dirty index.")
}
}
}
index.put(DIRTY_INDEX_KEY, TRUE)
loadCounters
}
}
@ -694,23 +703,41 @@ class LevelDBClient(store: LevelDBStore) {
// replay failed.. good thing we are in a retry block...
index.close
throw e;
} finally {
recoveryLogs = null
}
}
}
private def logRefDecrement(pos: Long) {
log.log_info(pos).foreach { logInfo =>
logRefs.get(logInfo.position).foreach { counter =>
for( key <- logRefKey(pos) ) {
logRefs.get(key).foreach { counter =>
if (counter.decrementAndGet() == 0) {
logRefs.remove(logInfo.position)
logRefs.remove(key)
}
}
}
}
private def logRefIncrement(pos: Long) {
log.log_info(pos).foreach { logInfo =>
logRefs.getOrElseUpdate(logInfo.position, new LongCounter()).incrementAndGet()
for( key <- logRefKey(pos) ) {
logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
}
}
def logRefKey(pos: Long, log_info: RecordLog.LogInfo=null): Option[Long] = {
if( log_info!=null ) {
Some(log_info.position)
} else {
val rc = if( recoveryLogs !=null ) {
Option(recoveryLogs.floorKey(pos))
} else {
log.log_info(pos).map(_.position)
}
if( !rc.isDefined ) {
warn("Invalid log position: " + pos)
}
rc
}
}
@ -741,8 +768,25 @@ class LevelDBClient(store: LevelDBStore) {
case e => throw e
}
}
def storeList[T <: AnyRef](key:Array[Byte], list:Array[Long]) {
val baos = new ByteArrayOutputStream()
val os = new ObjectOutputStream(baos);
os.writeInt(list.size);
for( k <- list ) {
os.writeLong(k)
}
os.close()
try {
index.put(key, baos.toByteArray)
}
catch {
case e => throw e
}
}
storeMap(LOG_REF_INDEX_KEY, logRefs)
storeMap(COLLECTION_META_KEY, collectionMeta)
storeList(LOGS_INDEX_KEY, log.log_file_positions)
}
private def loadCounters = {
@ -758,8 +802,28 @@ class LevelDBClient(store: LevelDBStore) {
}
}
}
def loadList[T <: AnyRef](key:Array[Byte]) = {
index.get(key, new ReadOptions).map { value=>
val rc = ListBuffer[Long]()
val bais = new ByteArrayInputStream(value)
val is = new ObjectInputStream(bais);
var remaining = is.readInt()
while(remaining > 0 ) {
rc.append(is.readLong())
remaining-=1
}
rc
}
}
loadMap(LOG_REF_INDEX_KEY, logRefs)
loadMap(COLLECTION_META_KEY, collectionMeta)
for( list <- loadList(LOGS_INDEX_KEY) ) {
recoveryLogs = new java.util.TreeMap[Long, Void]()
for( k <- list ) {
recoveryLogs.put(k, null)
}
}
}
var wal_append_position = 0L
@ -846,10 +910,8 @@ class LevelDBClient(store: LevelDBStore) {
try {
// Hard link all the index files.
for( file <- dirtyIndexFile.list_files) {
file.linkTo(tmpDir / file.getName)
}
// Copy the index to the tmp dir.
copyIndex(dirtyIndexFile, tmpDir)
// Rename to signal that the snapshot is complete.
tmpDir.renameTo(snapshotIndexFile(walPosition))
@ -1191,8 +1253,8 @@ class LevelDBClient(store: LevelDBStore) {
appender.append(LOG_ADD_ENTRY, log_data)
batch.put(key, index_data)
Option(log_info).orElse(log.log_info(dataLocator._1)).foreach { logInfo =>
logRefs.getOrElseUpdate(logInfo.position, new LongCounter()).incrementAndGet()
for( key <- logRefKey(pos, log_info) ) {
logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
}
collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
@ -1331,7 +1393,7 @@ class LevelDBClient(store: LevelDBStore) {
// the the
var limit = oldest_retained_snapshot
val deleteLimit = log.log_info(limit).map(_.position).getOrElse(limit).min(log.appender_start)
val deleteLimit = logRefKey(limit).getOrElse(limit).min(log.appender_start)
emptyJournals.foreach { id =>
if ( id < deleteLimit ) {

View File

@ -492,6 +492,11 @@ case class RecordLog(directory: File, logSuffix:String) {
def log_info(pos:Long) = log_mutex.synchronized { Option(log_infos.floorEntry(pos)).map(_.getValue) }
def log_file_positions = log_mutex.synchronized {
import collection.JavaConversions._
log_infos.map(_._2.position).toArray
}
private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = {
val lookup = log_mutex.synchronized {

View File

@ -87,7 +87,7 @@ object ReplicationSupport {
val index_file = index_dirs.last._2
var target = to / index_file.getName
target.mkdirs()
LevelDBClient.copy_index(index_file, target)
LevelDBClient.copyIndex(index_file, target)
}
}