From 2824a94af4040552feba3818b84ac2fdf1f4dffe Mon Sep 17 00:00:00 2001 From: Hiram Chirino Date: Fri, 1 Nov 2013 14:12:15 -0400 Subject: [PATCH] Continue to append to the last leveldb log file on a store restart. --- .../apache/activemq/leveldb/RecordLog.scala | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala index d35becdcad..e69b58afba 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala @@ -376,7 +376,7 @@ case class RecordLog(directory: File, logSuffix:String) { return Some(record_position+LOG_HEADER_SIZE+length, uow_start_pos) } - def verifyAndGetEndPosition:Long = { + def verifyAndGetEndOffset:Long = { var pos = position; var current_uow_start = pos val limit = position+channel.size() @@ -387,15 +387,15 @@ case class RecordLog(directory: File, logSuffix:String) { if( uow_start_pos == current_uow_start ) { current_uow_start = next } else { - return current_uow_start + return current_uow_start-position } } pos = next case None => - return current_uow_start + return current_uow_start-position } } - return current_uow_start + return current_uow_start-position } } @@ -417,33 +417,37 @@ case class RecordLog(directory: File, logSuffix:String) { val max_log_flush_latency = TimeMetric() val max_log_rotate_latency = TimeMetric() - def open(append_size:Long= -1) = { + def open(appender_size:Long= -1) = { log_mutex.synchronized { log_infos.clear() LevelDBClient.find_sequence_files(directory, logSuffix).foreach { case (position,file) => log_infos.put(position, LogInfo(file, position, file.length())) } - val appendPos = if( log_infos.isEmpty ) { + if( log_infos.isEmpty ) { create_appender(0,0) } else { val file = log_infos.lastEntry().getValue - if( append_size == -1 ) { + if( appender_size == -1 ) { val r = LogReader(file.file, file.position) try { - val actualLength = r.verifyAndGetEndPosition - val updated = file.copy(length = actualLength - file.position) - log_infos.put(updated.position, updated) - if( updated.file.length != file.length ) { - // we need to truncate. - using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length)) + val endOffset = r.verifyAndGetEndOffset + using(new RandomAccessFile(file.file, "rw")) { file=> + try { + file.getChannel.truncate(endOffset) + } + catch { + case e:Throwable => + e.printStackTrace() + } + file.getChannel.force(true) } - create_appender(actualLength,0) + create_appender(file.position,endOffset) } finally { r.release() } } else { - create_appender(file.position,append_size) + create_appender(file.position,appender_size) } } }