Fixes leveldb replication bug that can cause read errors: make sure that log is flushed for records that are being read.

This commit is contained in:
Hiram Chirino 2013-10-08 18:37:06 -04:00
parent b1bf99425b
commit 119fdab1d0
1 changed files with 3 additions and 1 deletions

View File

@ -306,18 +306,20 @@ case class RecordLog(directory: File, logSuffix:String) {
def read(record_position:Long) = { def read(record_position:Long) = {
val offset = record_position-position val offset = record_position-position
val header = new Buffer(LOG_HEADER_SIZE) val header = new Buffer(LOG_HEADER_SIZE)
check_read_flush(offset+LOG_HEADER_SIZE)
channel.read(header.toByteBuffer, offset) channel.read(header.toByteBuffer, offset)
val is = header.bigEndianEditor(); val is = header.bigEndianEditor();
val prefix = is.readByte() val prefix = is.readByte()
if( prefix != LOG_HEADER_PREFIX ) { if( prefix != LOG_HEADER_PREFIX ) {
// Does not look like a record. // Does not look like a record.
throw new IOException("invalid record position") throw new IOException("invalid record position %d (file: %s, offset: %d)".format(record_position, file.getName, offset))
} }
val id = is.readByte() val id = is.readByte()
val expectedChecksum = is.readInt() val expectedChecksum = is.readInt()
val length = is.readInt() val length = is.readInt()
val data = new Buffer(length) val data = new Buffer(length)
check_read_flush(offset+LOG_HEADER_SIZE+length)
if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != length ) { if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != length ) {
throw new IOException("short record") throw new IOException("short record")
} }