Continue to append to the last leveldb log file on a store restart.

This commit is contained in:
Hiram Chirino 2013-11-01 14:12:15 -04:00
parent 85fc686006
commit 2824a94af4
1 changed files with 19 additions and 15 deletions

View File

@ -376,7 +376,7 @@ case class RecordLog(directory: File, logSuffix:String) {
return Some(record_position+LOG_HEADER_SIZE+length, uow_start_pos) return Some(record_position+LOG_HEADER_SIZE+length, uow_start_pos)
} }
def verifyAndGetEndPosition:Long = { def verifyAndGetEndOffset:Long = {
var pos = position; var pos = position;
var current_uow_start = pos var current_uow_start = pos
val limit = position+channel.size() val limit = position+channel.size()
@ -387,15 +387,15 @@ case class RecordLog(directory: File, logSuffix:String) {
if( uow_start_pos == current_uow_start ) { if( uow_start_pos == current_uow_start ) {
current_uow_start = next current_uow_start = next
} else { } else {
return current_uow_start return current_uow_start-position
} }
} }
pos = next pos = next
case None => case None =>
return current_uow_start return current_uow_start-position
} }
} }
return current_uow_start return current_uow_start-position
} }
} }
@ -417,33 +417,37 @@ case class RecordLog(directory: File, logSuffix:String) {
val max_log_flush_latency = TimeMetric() val max_log_flush_latency = TimeMetric()
val max_log_rotate_latency = TimeMetric() val max_log_rotate_latency = TimeMetric()
def open(append_size:Long= -1) = { def open(appender_size:Long= -1) = {
log_mutex.synchronized { log_mutex.synchronized {
log_infos.clear() log_infos.clear()
LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file) => LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file) =>
log_infos.put(position, LogInfo(file, position, file.length())) log_infos.put(position, LogInfo(file, position, file.length()))
} }
val appendPos = if( log_infos.isEmpty ) { if( log_infos.isEmpty ) {
create_appender(0,0) create_appender(0,0)
} else { } else {
val file = log_infos.lastEntry().getValue val file = log_infos.lastEntry().getValue
if( append_size == -1 ) { if( appender_size == -1 ) {
val r = LogReader(file.file, file.position) val r = LogReader(file.file, file.position)
try { try {
val actualLength = r.verifyAndGetEndPosition val endOffset = r.verifyAndGetEndOffset
val updated = file.copy(length = actualLength - file.position) using(new RandomAccessFile(file.file, "rw")) { file=>
log_infos.put(updated.position, updated) try {
if( updated.file.length != file.length ) { file.getChannel.truncate(endOffset)
// we need to truncate.
using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length))
} }
create_appender(actualLength,0) catch {
case e:Throwable =>
e.printStackTrace()
}
file.getChannel.force(true)
}
create_appender(file.position,endOffset)
} finally { } finally {
r.release() r.release()
} }
} else { } else {
create_appender(file.position,append_size) create_appender(file.position,appender_size)
} }
} }
} }